Merge branch 'fix/TD-30837' of https://github.com/taosdata/TDengine into fix/TD-30837

This commit is contained in:
54liuyao 2024-09-19 19:07:41 +08:00
commit 2938233519
18 changed files with 185 additions and 123 deletions

View File

@ -153,7 +153,6 @@ enum {
STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK, STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__MERGED_SUBMIT, STREAM_INPUT__MERGED_SUBMIT,
STREAM_INPUT__TQ_SCAN,
STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES, STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT, STREAM_INPUT__CHECKPOINT,

View File

@ -221,8 +221,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo); bool qStreamScanhistoryFinished(qTaskInfo_t tinfo);
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo); int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
void resetTaskInfo(qTaskInfo_t tinfo); void qResetTaskInfoCode(qTaskInfo_t tinfo);
int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval);
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo); int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo); int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);

View File

@ -309,14 +309,16 @@ typedef struct SDataRange {
} SDataRange; } SDataRange;
typedef struct SSTaskBasicInfo { typedef struct SSTaskBasicInfo {
int32_t nodeId; // vgroup id or snode id int32_t nodeId; // vgroup id or snode id
SEpSet epSet; SEpSet epSet;
SEpSet mnodeEpset; // mnode epset for send heartbeat SEpSet mnodeEpset; // mnode epset for send heartbeat
int32_t selfChildId; int32_t selfChildId;
int32_t totalLevel; int32_t trigger;
int8_t taskLevel; int8_t taskLevel;
int8_t fillHistory; // is fill history task or not int8_t fillHistory; // is fill history task or not
int64_t delaySchedParam; // in msec int64_t delaySchedParam; // in msec
int64_t watermark; // extracted from operators
SInterval interval;
} SSTaskBasicInfo; } SSTaskBasicInfo;
typedef struct SStreamRetrieveReq SStreamRetrieveReq; typedef struct SStreamRetrieveReq SStreamRetrieveReq;
@ -548,8 +550,9 @@ typedef struct STaskUpdateEntry {
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int32_t trigger,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** pTask); int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5,
SStreamTask** pTask);
void tFreeStreamTask(SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);

View File

@ -242,7 +242,7 @@ static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgrou
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory, int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, 0, *pTaskList, pStream->conf.fillHistory,
pStream->subTableWithoutMd5, &pTask); pStream->subTableWithoutMd5, &pTask);
if (code != 0) { if (code != 0) {
return code; return code;
@ -356,8 +356,9 @@ static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillh
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
int32_t code = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0, int32_t code = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, pStream->conf.trigger,
*pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5, pTask); useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
pStream->subTableWithoutMd5, pTask);
return code; return code;
} }
@ -395,17 +396,17 @@ static void setHTasksId(SStreamObj* pStream) {
} }
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey, static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) { SArray* pVerList, SVgObj* pVgroup, bool isHistoryTask, bool useTriggerParam) {
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
int32_t code = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask); int32_t code = buildSourceTask(pStream, pEpset, isHistoryTask, useTriggerParam, &pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isHistoryTask);
if (pStream->conf.fillHistory) { if (pStream->conf.fillHistory) {
haltInitialTaskStatus(pTask, plan, isFillhistory); haltInitialTaskStatus(pTask, plan, isHistoryTask);
} }
streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId); streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
@ -451,10 +452,12 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) { int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
void* pIter = NULL;
int32_t code = 0;
SSdb* pSdb = pMnode->pSdb;
addNewTaskList(pStream); addNewTaskList(pStream);
void* pIter = NULL;
SSdb* pSdb = pMnode->pSdb;
while (1) { while (1) {
SVgObj* pVgroup; SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
@ -467,10 +470,9 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream
continue; continue;
} }
int code = code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
if (code != 0) { if (code != 0) {
mError("create stream task, code:%s", tstrerror(code)); mError("failed to create stream task, code:%s", tstrerror(code));
// todo drop the added source tasks. // todo drop the added source tasks.
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
@ -502,9 +504,9 @@ static int32_t buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhist
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
int32_t code = int32_t code = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, pStream->conf.trigger,
tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0, useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
*pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5, pAggTask); pStream->subTableWithoutMd5, pAggTask);
return code; return code;
} }
@ -678,7 +680,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
// add extra sink // add extra sink
hasExtraSink = true; hasExtraSink = true;
int32_t code = addSinkTask(pMnode, pStream, pEpset); code = addSinkTask(pMnode, pStream, pEpset);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -692,7 +694,8 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
TAOS_RETURN(code); TAOS_RETURN(code);
} }
code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, numOfPlanLevel == 1);
code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }

View File

@ -461,17 +461,16 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->outputSchema.pSchema = pFullSchema; pObj->outputSchema.pSchema = pFullSchema;
} }
bool hasKey = hasDestPrimaryKey(&pObj->outputSchema);
SPlanContext cxt = { SPlanContext cxt = {
.pAstRoot = pAst, .pAstRoot = pAst,
.topicQuery = false, .topicQuery = false,
.streamQuery = true, .streamQuery = true,
.triggerType = pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger, .triggerType = (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY)? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
.watermark = pObj->conf.watermark, .watermark = pObj->conf.watermark,
.igExpired = pObj->conf.igExpired, .igExpired = pObj->conf.igExpired,
.deleteMark = pObj->deleteMark, .deleteMark = pObj->deleteMark,
.igCheckUpdate = pObj->igCheckUpdate, .igCheckUpdate = pObj->igCheckUpdate,
.destHasPrimaryKey = hasKey, .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema),
}; };
// using ast and param to build physical plan // using ast and param to build physical plan

View File

@ -38,7 +38,6 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce
streamTaskOpenAllUpstreamInput(pTask); streamTaskOpenAllUpstreamInput(pTask);
streamTaskResetUpstreamStageInfo(pTask); streamTaskResetUpstreamStageInfo(pTask);
(void)streamSetupScheduleTrigger(pTask);
SCheckpointInfo *pChkInfo = &pTask->chkInfo; SCheckpointInfo *pChkInfo = &pTask->chkInfo;
tqSetRestoreVersionInfo(pTask); tqSetRestoreVersionInfo(pTask);

View File

@ -711,8 +711,10 @@ end:
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) { int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) {
STQ* pTq = (STQ*)pTqObj; STQ* pTq = (STQ*)pTqObj;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SCheckpointInfo* pChkInfo = NULL;
tqDebug("s-task:0x%x start to build task", pTask->id.taskId); tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, nextProcessVer); int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, nextProcessVer);
@ -763,9 +765,8 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
} }
streamTaskResetUpstreamStageInfo(pTask); streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo; pChkInfo = &pTask->chkInfo;
tqSetRestoreVersionInfo(pTask); tqSetRestoreVersionInfo(pTask);
char* p = streamTaskGetStatus(pTask).name; char* p = streamTaskGetStatus(pTask).name;
@ -885,13 +886,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = NULL;
SStreamTask* pStreamTask = NULL;
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to acquire stream task:0x%x during scan history data, task may have been destroyed", tqError("vgId:%d failed to acquire stream task:0x%x during scan history data, task may have been destroyed",
pMeta->vgId, pReq->taskId); pMeta->vgId, pReq->taskId);
return -1; return code;
} }
// do recovery step1 // do recovery step1
@ -956,11 +958,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
ETaskStatus s = p.state; ETaskStatus s = p.state;
if (s == TASK_STATUS__PAUSE) { if (s == TASK_STATUS__PAUSE) {
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr, tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", id, el,
el, pTask->execInfo.step1El, status); pTask->execInfo.step1El, status);
} else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) { } else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr, tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", id, p.name,
p.name, pTask->execInfo.step1El); pTask->execInfo.step1El);
} }
} }
@ -977,7 +979,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
// 1. get the related stream task // 1. get the related stream task
SStreamTask* pStreamTask = NULL;
code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask); code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s", tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
@ -988,15 +989,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return code; // todo: handle failure return code;
} }
if (pStreamTask->info.taskLevel != TASK_LEVEL__SOURCE) { if (pStreamTask->info.taskLevel != TASK_LEVEL__SOURCE) {
tqError("s-task:%s fill-history task related stream task level:%d, unexpected", id, pStreamTask->info.taskLevel); tqError("s-task:%s fill-history task related stream task level:%d, unexpected", id, pStreamTask->info.taskLevel);
return TSDB_CODE_STREAM_INTERNAL_ERROR; return TSDB_CODE_STREAM_INTERNAL_ERROR;
} }
code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);

View File

@ -259,7 +259,12 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
bool taskReadyForDataFromWal(SStreamTask* pTask) { bool taskReadyForDataFromWal(SStreamTask* pTask) {
// non-source or fill-history tasks don't need to response the WAL scan action. // non-source or fill-history tasks don't need to response the WAL scan action.
if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) { SSTaskBasicInfo* pInfo = &pTask->info;
if ((pInfo->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
return false;
}
if (pInfo->taskLevel == TASK_LEVEL__SOURCE && pInfo->trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
return false; return false;
} }
@ -271,7 +276,7 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
} }
// fill-history task has entered into the last phase, no need to anything // fill-history task has entered into the last phase, no need to anything
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { if ((pInfo->fillHistory == 1) && pTask->status.appendTranstateBlock) {
// the maximum version of data in the WAL has reached already, the step2 is done // the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
pTask->dataRange.range.maxVer); pTask->dataRange.range.maxVer);
@ -378,7 +383,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
numOfTasks = taosArrayGetSize(pTaskList); numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pTaskId = taosArrayGet(pTaskList, i); STaskId* pTaskId = taosArrayGet(pTaskList, i);
if (pTaskId == NULL) { if (pTaskId == NULL) {
continue; continue;
} }
@ -408,9 +413,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask); SStreamTaskState state = streamTaskGetStatus(pTask);
if (pState.state != TASK_STATUS__READY) { if (state.state != TASK_STATUS__READY) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pState.name); tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, state.name);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;

View File

@ -89,6 +89,8 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
} }
} }
streamSetupScheduleTrigger(pTask);
double el = (taosGetTimestampMs() - st) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el); tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
@ -606,6 +608,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
int32_t taskId = -1; int32_t taskId = -1;
int64_t streamId = -1; int64_t streamId = -1;
bool added = false; bool added = false;
int32_t size = sizeof(SStreamTask);
if (tsDisableStream) { if (tsDisableStream) {
tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId); tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
@ -615,7 +618,6 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId); tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
// 1.deserialize msg and build task // 1.deserialize msg and build task
int32_t size = sizeof(SStreamTask);
SStreamTask* pTask = taosMemoryCalloc(1, size); SStreamTask* pTask = taosMemoryCalloc(1, size);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size); tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);

View File

@ -625,9 +625,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
} }
break; break;
case TDMT_STREAM_TASK_DEPLOY: { case TDMT_STREAM_TASK_DEPLOY: {
int32_t code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len); if ((code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len)) != 0) {
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto _err; goto _err;
} }
} break; } break;

View File

@ -131,7 +131,7 @@ static void clearStreamBlock(SOperatorInfo* pOperator) {
} }
} }
void resetTaskInfo(qTaskInfo_t tinfo) { void qResetTaskInfoCode(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
pTaskInfo->code = 0; pTaskInfo->code = 0;
clearStreamBlock(pTaskInfo->pRoot); clearStreamBlock(pTaskInfo->pRoot);
@ -1093,6 +1093,10 @@ _end:
return code; return code;
} }
int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval) {
return 0;
}
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot; SOperatorInfo* pOperator = pTaskInfo->pRoot;

View File

@ -239,7 +239,7 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
int64_t checkpointId, SRpcMsg* pMsg); int64_t checkpointId, SRpcMsg* pMsg);
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock); int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock);
int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerType, int32_t trigger);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -15,6 +15,42 @@
#include "streamInt.h" #include "streamInt.h"
static int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) {
*pSubmit = NULL;
int32_t code = taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0, (void**)pSubmit);
if (code) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSubmit)->submits = taosArrayInit(0, sizeof(SPackedData));
if ((*pSubmit)->submits == NULL) {
taosFreeQitem(*pSubmit);
*pSubmit = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSubmit)->type = STREAM_INPUT__MERGED_SUBMIT;
return TSDB_CODE_SUCCESS;
}
static int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
void* p = taosArrayPush(pMerged->submits, &pSubmit->submit);
if (p == NULL) {
return terrno;
}
if (pSubmit->ver > pMerged->ver) {
pMerged->ver = pSubmit->ver;
}
return 0;
}
static void freeItems(void* param) {
SSDataBlock* pBlock = param;
taosArrayDestroy(pBlock->pDataBlock);
}
int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg, SStreamDataBlock** pRes) { int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg, SStreamDataBlock** pRes) {
SStreamDataBlock* pData = NULL; SStreamDataBlock* pData = NULL;
int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen, (void**)&pData); int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen, (void**)&pData);
@ -179,37 +215,6 @@ void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
} }
} }
int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) {
*pSubmit = NULL;
int32_t code = taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0, (void**)pSubmit);
if (code) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSubmit)->submits = taosArrayInit(0, sizeof(SPackedData));
if ((*pSubmit)->submits == NULL) {
taosFreeQitem(*pSubmit);
*pSubmit = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSubmit)->type = STREAM_INPUT__MERGED_SUBMIT;
return TSDB_CODE_SUCCESS;
}
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
void* p = taosArrayPush(pMerged->submits, &pSubmit->submit);
if (p == NULL) {
return terrno;
}
if (pSubmit->ver > pMerged->ver) {
pMerged->ver = pSubmit->ver;
}
return 0;
}
// todo handle memory error // todo handle memory error
int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem, SStreamQueueItem** pRes) { int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem, SStreamQueueItem** pRes) {
*pRes = NULL; *pRes = NULL;
@ -267,11 +272,6 @@ int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem
} }
} }
static void freeItems(void* param) {
SSDataBlock* pBlock = param;
taosArrayDestroy(pBlock->pDataBlock);
}
void streamFreeQitem(SStreamQueueItem* data) { void streamFreeQitem(SStreamQueueItem* data) {
int8_t type = data->type; int8_t type = data->type;
if (type == STREAM_INPUT__GET_RES) { if (type == STREAM_INPUT__GET_RES) {
@ -306,3 +306,36 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosFreeQitem(pBlock); taosFreeQitem(pBlock);
} }
} }
int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerType, int32_t trigger) {
QRY_PARAM_CHECK(pTrigger);
SStreamTrigger* p = NULL;
int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&p);
if (code) {
return code;
}
p->type = STREAM_INPUT__GET_RES;
p->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (p->pBlock == NULL) {
taosFreeQitem(p);
return terrno;
}
p->pBlock->info.type = STREAM_GET_ALL;
// let's calculate the previous time window
// todo get the time precision for ts
if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
SInterval interval = {.interval = trigger, .sliding = trigger, .intervalUnit = 'a', .slidingUnit = 'a'};
int64_t now = taosGetTimestampMs();
STimeWindow window = getAlignQueryTimeWindow(&interval, now - trigger);
p->pBlock->info.window = window;
}
*pTrigger = p;
return code;
}

View File

@ -119,7 +119,7 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
uint64_t ts = 0; uint64_t ts = 0;
if ((code = qExecTask(pExecutor, &output, &ts)) < 0) { if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) { if (code == TSDB_CODE_QRY_IN_EXEC) {
resetTaskInfo(pExecutor); qResetTaskInfoCode(pExecutor);
} }
if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) { if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) {

View File

@ -711,7 +711,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
} }
code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
if (code) { if (code) { // todo remove it from task list
stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId); stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
return code; return code;
} }

View File

@ -613,7 +613,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver)); TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId)); TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId)); TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.totalLevel)); TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel)); TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type)); TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type));
TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType)); TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType));
@ -692,7 +692,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId)); TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId)); TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.totalLevel)); TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel)); TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type)); TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type));
TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType)); TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType));

View File

@ -20,16 +20,41 @@ static void streamTaskResumeHelper(void* param, void* tmrId);
static void streamTaskSchedHelper(void* param, void* tmrId); static void streamTaskSchedHelper(void* param, void* tmrId);
void streamSetupScheduleTrigger(SStreamTask* pTask) { void streamSetupScheduleTrigger(SStreamTask* pTask) {
int64_t delaySchema = pTask->info.delaySchedParam; int64_t delay = 0;
if (delaySchema != 0 && pTask->info.fillHistory == 0) { int32_t code = 0;
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); const char* id = pTask->id.idStr;
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref, if (pTask->info.fillHistory == 1) {
pTask->info.delaySchedParam); return;
streamTmrStart(streamTaskSchedHelper, (int32_t)delaySchema, pTask, streamTimer, &pTask->schedInfo.pDelayTimer,
pTask->pMeta->vgId, "sched-tmr");
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
} }
// dynamic set the trigger & triggerParam for STREAM_TRIGGER_FORCE_WINDOW_CLOSE
if ((pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) && (pTask->info.taskLevel == TASK_LEVEL__SOURCE)) {
int64_t waterMark = 0;
SInterval interval = {0};
code = qGetStreamIntervalExecInfo(pTask->exec.pExecutor, &waterMark, &interval);
if (code == 0) {
pTask->info.delaySchedParam = interval.sliding;
pTask->info.watermark = waterMark;
pTask->info.interval = interval;
}
// todo: calculate the correct start delay time for force_window_close
delay = pTask->info.delaySchedParam;
stInfo("s-task:%s extract interval info from executor, wm:%" PRId64 " interval:%" PRId64 " unit:%c sliding:%" PRId64
" unit:%c ",
id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit);
}
if (delay == 0) {
return;
}
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", id, ref, pTask->info.delaySchedParam);
streamTmrStart(streamTaskSchedHelper, delay, pTask, streamTimer, &pTask->schedInfo.pDelayTimer,
pTask->pMeta->vgId, "sched-tmr");
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
} }
int32_t streamTrySchedExec(SStreamTask* pTask) { int32_t streamTrySchedExec(SStreamTask* pTask) {
@ -131,26 +156,15 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
if (status == TASK_TRIGGER_STATUS__ACTIVE) { if (status == TASK_TRIGGER_STATUS__ACTIVE) {
SStreamTrigger* pTrigger; SStreamTrigger* pTrigger;
int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&pTrigger); int32_t code = streamCreateSinkResTrigger(&pTrigger, pTask->info.trigger, pTask->info.delaySchedParam);
if (code) { if (code) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, tstrerror(code),
nextTrigger); nextTrigger);
terrno = code; terrno = code;
goto _end; goto _end;
} }
pTrigger->type = STREAM_INPUT__GET_RES;
pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pTrigger->pBlock == NULL) {
taosFreeQitem(pTrigger);
stError("s-task:%s failed to build retrieve data trigger, code:out of memory, try again in %dms", id,
nextTrigger);
goto _end;
}
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
pTrigger->pBlock->info.type = STREAM_GET_ALL;
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -103,8 +103,9 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
return pEpInfo; return pEpInfo;
} }
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int32_t trigger,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** p) { int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5,
SStreamTask** p) {
*p = NULL; *p = NULL;
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
@ -120,6 +121,7 @@ int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool
pTask->info.taskLevel = taskLevel; pTask->info.taskLevel = taskLevel;
pTask->info.fillHistory = fillHistory; pTask->info.fillHistory = fillHistory;
pTask->info.trigger = trigger;
pTask->info.delaySchedParam = triggerParam; pTask->info.delaySchedParam = triggerParam;
pTask->subtableWithoutMd5 = subtableWithoutMd5; pTask->subtableWithoutMd5 = subtableWithoutMd5;