feat(stream): support the force_window_close trigger.

This commit is contained in:
Haojun Liao 2024-09-19 16:23:51 +08:00
parent b8cd739383
commit e1d8e98b11
18 changed files with 187 additions and 125 deletions

View File

@ -153,7 +153,6 @@ enum {
STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__MERGED_SUBMIT,
STREAM_INPUT__TQ_SCAN,
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT,

View File

@ -221,8 +221,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo);
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
void resetTaskInfo(qTaskInfo_t tinfo);
void qResetTaskInfoCode(qTaskInfo_t tinfo);
int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval);
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);

View File

@ -313,10 +313,12 @@ typedef struct SSTaskBasicInfo {
SEpSet epSet;
SEpSet mnodeEpset; // mnode epset for send heartbeat
int32_t selfChildId;
int32_t totalLevel;
int32_t trigger;
int8_t taskLevel;
int8_t fillHistory; // is fill history task or not
int64_t delaySchedParam; // in msec
int64_t watermark; // extracted from operators
SInterval interval;
} SSTaskBasicInfo;
typedef struct SStreamRetrieveReq SStreamRetrieveReq;
@ -541,8 +543,9 @@ typedef struct STaskUpdateEntry {
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** pTask);
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int32_t trigger,
int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5,
SStreamTask** pTask);
void tFreeStreamTask(SStreamTask* pTask);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);

View File

@ -242,7 +242,7 @@ static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgrou
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
SStreamTask* pTask = NULL;
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory,
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, 0, *pTaskList, pStream->conf.fillHistory,
pStream->subTableWithoutMd5, &pTask);
if (code != 0) {
return code;
@ -356,8 +356,9 @@ static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillh
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
*pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5, pTask);
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, pStream->conf.trigger,
useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
pStream->subTableWithoutMd5, pTask);
return code;
}
@ -395,17 +396,17 @@ static void setHTasksId(SStreamObj* pStream) {
}
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) {
SArray* pVerList, SVgObj* pVgroup, bool isHistoryTask, bool useTriggerParam) {
SStreamTask* pTask = NULL;
int32_t code = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask);
int32_t code = buildSourceTask(pStream, pEpset, isHistoryTask, useTriggerParam, &pTask);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory);
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isHistoryTask);
if (pStream->conf.fillHistory) {
haltInitialTaskStatus(pTask, plan, isFillhistory);
haltInitialTaskStatus(pTask, plan, isHistoryTask);
}
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
@ -451,10 +452,12 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
void* pIter = NULL;
int32_t code = 0;
SSdb* pSdb = pMnode->pSdb;
addNewTaskList(pStream);
void* pIter = NULL;
SSdb* pSdb = pMnode->pSdb;
while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
@ -467,10 +470,9 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream
continue;
}
int code =
doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
if (code != 0) {
mError("create stream task, code:%s", tstrerror(code));
mError("failed to create stream task, code:%s", tstrerror(code));
// todo drop the added source tasks.
sdbRelease(pSdb, pVgroup);
@ -502,9 +504,9 @@ static int32_t buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhist
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
int32_t code =
tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
*pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5, pAggTask);
int32_t code = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, pStream->conf.trigger,
useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
pStream->subTableWithoutMd5, pAggTask);
return code;
}
@ -678,7 +680,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
// add extra sink
hasExtraSink = true;
int32_t code = addSinkTask(pMnode, pStream, pEpset);
code = addSinkTask(pMnode, pStream, pEpset);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -692,7 +694,8 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, numOfPlanLevel == 1);
code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1));
if (code != TSDB_CODE_SUCCESS) {
return code;
}

View File

@ -461,17 +461,16 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->outputSchema.pSchema = pFullSchema;
}
bool hasKey = hasDestPrimaryKey(&pObj->outputSchema);
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
.triggerType = (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY)? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
.watermark = pObj->conf.watermark,
.igExpired = pObj->conf.igExpired,
.deleteMark = pObj->deleteMark,
.igCheckUpdate = pObj->igCheckUpdate,
.destHasPrimaryKey = hasKey,
.destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
};
// using ast and param to build physical plan

View File

@ -38,7 +38,6 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce
streamTaskOpenAllUpstreamInput(pTask);
streamTaskResetUpstreamStageInfo(pTask);
(void)streamSetupScheduleTrigger(pTask);
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
tqSetRestoreVersionInfo(pTask);

View File

@ -709,8 +709,9 @@ static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) {
STQ* pTq = (STQ*)pTqObj;
int32_t vgId = TD_VID(pTq->pVnode);
SCheckpointInfo* pChkInfo = NULL;
tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, nextProcessVer);
@ -752,9 +753,8 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
}
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
pChkInfo = &pTask->chkInfo;
tqSetRestoreVersionInfo(pTask);
char* p = streamTaskGetStatus(pTask).name;
@ -874,13 +874,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = NULL;
SStreamTask* pStreamTask = NULL;
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
if (pTask == NULL) {
tqError("vgId:%d failed to acquire stream task:0x%x during scan history data, task may have been destroyed",
pMeta->vgId, pReq->taskId);
return -1;
return code;
}
// do recovery step1
@ -945,11 +946,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
ETaskStatus s = p.state;
if (s == TASK_STATUS__PAUSE) {
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr,
el, pTask->execInfo.step1El, status);
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", id, el,
pTask->execInfo.step1El, status);
} else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr,
p.name, pTask->execInfo.step1El);
tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", id, p.name,
pTask->execInfo.step1El);
}
}
@ -966,7 +967,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
// 1. get the related stream task
SStreamTask* pStreamTask = NULL;
code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
if (pStreamTask == NULL) {
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
@ -977,15 +977,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask);
return code; // todo: handle failure
return code;
}
if (pStreamTask->info.taskLevel != TASK_LEVEL__SOURCE) {
tqError("s-task:%s fill-history task related stream task level:%d, unexpected", id, pStreamTask->info.taskLevel);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
streamMetaReleaseTask(pMeta, pStreamTask);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);

View File

@ -259,7 +259,12 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
bool taskReadyForDataFromWal(SStreamTask* pTask) {
// non-source or fill-history tasks don't need to response the WAL scan action.
if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
SSTaskBasicInfo* pInfo = &pTask->info;
if ((pInfo->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
return false;
}
if (pInfo->taskLevel == TASK_LEVEL__SOURCE && pInfo->trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
return false;
}
@ -271,7 +276,7 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
}
// fill-history task has entered into the last phase, no need to anything
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
if ((pInfo->fillHistory == 1) && pTask->status.appendTranstateBlock) {
// the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
pTask->dataRange.range.maxVer);
@ -397,9 +402,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state != TASK_STATUS__READY) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pState.name);
SStreamTaskState state = streamTaskGetStatus(pTask);
if (state.state != TASK_STATUS__READY) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, state.name);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;

View File

@ -83,6 +83,8 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
}
streamSetupScheduleTrigger(pTask);
double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
@ -590,6 +592,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
int32_t taskId = -1;
int64_t streamId = -1;
bool added = false;
int32_t size = sizeof(SStreamTask);
if (tsDisableStream) {
tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
@ -599,7 +602,6 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
// 1.deserialize msg and build task
int32_t size = sizeof(SStreamTask);
SStreamTask* pTask = taosMemoryCalloc(1, size);
if (pTask == NULL) {
tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);

View File

@ -625,9 +625,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
}
break;
case TDMT_STREAM_TASK_DEPLOY: {
int32_t code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
if ((code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len)) != 0) {
goto _err;
}
} break;

View File

@ -131,7 +131,7 @@ static void clearStreamBlock(SOperatorInfo* pOperator) {
}
}
void resetTaskInfo(qTaskInfo_t tinfo) {
void qResetTaskInfoCode(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
pTaskInfo->code = 0;
clearStreamBlock(pTaskInfo->pRoot);
@ -1085,6 +1085,10 @@ _end:
return code;
}
int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval) {
return 0;
}
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;

View File

@ -239,7 +239,7 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
int64_t checkpointId, SRpcMsg* pMsg);
void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock);
int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerType, int32_t trigger);
#ifdef __cplusplus
}

View File

@ -15,6 +15,42 @@
#include "streamInt.h"
static int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) {
*pSubmit = NULL;
int32_t code = taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0, (void**)pSubmit);
if (code) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSubmit)->submits = taosArrayInit(0, sizeof(SPackedData));
if ((*pSubmit)->submits == NULL) {
taosFreeQitem(*pSubmit);
*pSubmit = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSubmit)->type = STREAM_INPUT__MERGED_SUBMIT;
return TSDB_CODE_SUCCESS;
}
static int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
void* p = taosArrayPush(pMerged->submits, &pSubmit->submit);
if (p == NULL) {
return terrno;
}
if (pSubmit->ver > pMerged->ver) {
pMerged->ver = pSubmit->ver;
}
return 0;
}
static void freeItems(void* param) {
SSDataBlock* pBlock = param;
taosArrayDestroy(pBlock->pDataBlock);
}
int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg, SStreamDataBlock** pRes) {
SStreamDataBlock* pData = NULL;
int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen, (void**)&pData);
@ -179,37 +215,6 @@ void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
}
}
int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) {
*pSubmit = NULL;
int32_t code = taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0, (void**)pSubmit);
if (code) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSubmit)->submits = taosArrayInit(0, sizeof(SPackedData));
if ((*pSubmit)->submits == NULL) {
taosFreeQitem(*pSubmit);
*pSubmit = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSubmit)->type = STREAM_INPUT__MERGED_SUBMIT;
return TSDB_CODE_SUCCESS;
}
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
void* p = taosArrayPush(pMerged->submits, &pSubmit->submit);
if (p == NULL) {
return terrno;
}
if (pSubmit->ver > pMerged->ver) {
pMerged->ver = pSubmit->ver;
}
return 0;
}
// todo handle memory error
int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem, SStreamQueueItem** pRes) {
*pRes = NULL;
@ -267,11 +272,6 @@ int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem
}
}
static void freeItems(void* param) {
SSDataBlock* pBlock = param;
taosArrayDestroy(pBlock->pDataBlock);
}
void streamFreeQitem(SStreamQueueItem* data) {
int8_t type = data->type;
if (type == STREAM_INPUT__GET_RES) {
@ -306,3 +306,36 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosFreeQitem(pBlock);
}
}
int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerType, int32_t trigger) {
QRY_PARAM_CHECK(pTrigger);
SStreamTrigger* p = NULL;
int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&p);
if (code) {
return code;
}
p->type = STREAM_INPUT__GET_RES;
p->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (p->pBlock == NULL) {
taosFreeQitem(p);
return terrno;
}
p->pBlock->info.type = STREAM_GET_ALL;
// let's calculate the previous time window
// todo get the time precision for ts
if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
SInterval interval = {.interval = trigger, .sliding = trigger, .intervalUnit = 'a', .slidingUnit = 'a'};
int64_t now = taosGetTimestampMs();
STimeWindow window = getAlignQueryTimeWindow(&interval, now - trigger);
p->pBlock->info.window = window;
}
*pTrigger = p;
return code;
}

View File

@ -119,7 +119,7 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
uint64_t ts = 0;
if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) {
resetTaskInfo(pExecutor);
qResetTaskInfoCode(pExecutor);
}
stError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, tstrerror(code));

View File

@ -750,7 +750,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
}
code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
if (code) {
if (code) { // todo remove it from task list
stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
return code;
}

View File

@ -613,7 +613,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.totalLevel));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
@ -692,7 +692,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.totalLevel));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));

View File

@ -20,15 +20,40 @@ static void streamTaskResumeHelper(void* param, void* tmrId);
static void streamTaskSchedHelper(void* param, void* tmrId);
void streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
pTask->info.delaySchedParam);
pTask->schedInfo.pDelayTimer =
taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer);
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
int64_t delay = 0;
int32_t code = 0;
const char* id = pTask->id.idStr;
if (pTask->info.fillHistory == 1) {
return;
}
// dynamic set the trigger & triggerParam for STREAM_TRIGGER_FORCE_WINDOW_CLOSE
if ((pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) && (pTask->info.taskLevel == TASK_LEVEL__SOURCE)) {
int64_t waterMark = 0;
SInterval interval = {0};
code = qGetStreamIntervalExecInfo(pTask->exec.pExecutor, &waterMark, &interval);
if (code == 0) {
pTask->info.delaySchedParam = interval.sliding;
pTask->info.watermark = waterMark;
pTask->info.interval = interval;
}
// todo: calculate the correct start delay time for force_window_close
delay = pTask->info.delaySchedParam;
stInfo("s-task:%s extract interval info from executor, wm:%" PRId64 " interval:%" PRId64 " unit:%c sliding:%" PRId64
" unit:%c ",
id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit);
}
if (delay == 0) {
return;
}
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", id, ref, pTask->info.delaySchedParam);
pTask->schedInfo.pDelayTimer = taosTmrStart(streamTaskSchedHelper, (int32_t)delay, pTask, streamTimer);
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}
int32_t streamTrySchedExec(SStreamTask* pTask) {
@ -135,32 +160,22 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
if (status == TASK_TRIGGER_STATUS__ACTIVE) {
SStreamTrigger* pTrigger;
int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&pTrigger);
int32_t code = streamCreateSinkResTrigger(&pTrigger, pTask->info.trigger, pTask->info.delaySchedParam);
if (code) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, tstrerror(code),
nextTrigger);
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
"sched-run-tmr");
terrno = code;
return;
}
pTrigger->type = STREAM_INPUT__GET_RES;
pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pTrigger->pBlock == NULL) {
taosFreeQitem(pTrigger);
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
nextTrigger);
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
return;
}
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
pTrigger->pBlock->info.type = STREAM_GET_ALL;
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
if (code != TSDB_CODE_SUCCESS) {
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
"sched-run-tmr");
return;
}

View File

@ -103,8 +103,9 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
return pEpInfo;
}
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** p) {
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int32_t trigger,
int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5,
SStreamTask** p) {
*p = NULL;
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
@ -120,6 +121,7 @@ int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool
pTask->info.taskLevel = taskLevel;
pTask->info.fillHistory = fillHistory;
pTask->info.trigger = trigger;
pTask->info.delaySchedParam = triggerParam;
pTask->subtableWithoutMd5 = subtableWithoutMd5;