diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8eda5de7cb..a063f39d92 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -241,6 +241,24 @@ typedef struct { SEpSet epset; } SDownstreamTaskEpset; +typedef enum { + TASK_SCANHISTORY_CONT = 0x1, + TASK_SCANHISTORY_QUIT = 0x2, + TASK_SCANHISTORY_REXEC = 0x3, +} EScanHistoryRet; + +typedef struct { + EScanHistoryRet ret; + int32_t idleTime; +} SScanhistoryDataInfo; + +typedef struct { + int32_t idleDuration; // idle time before use time slice the continue execute scan-history + int32_t numOfTicks; + tmr_h pTimer; + int32_t execCount; +} SScanhistorySchedInfo; + typedef struct { int64_t stbUid; char stbFullName[TSDB_TABLE_FNAME_LEN]; @@ -378,9 +396,10 @@ typedef struct STaskOutputInfo { union { STaskDispatcherFixed fixedDispatcher; STaskDispatcherShuffle shuffleDispatcher; - STaskSinkTb tbSink; - STaskSinkSma smaSink; - STaskSinkFetch fetchSink; + + STaskSinkTb tbSink; + STaskSinkSma smaSink; + STaskSinkFetch fetchSink; }; int8_t type; STokenBucket* pTokenBucket; @@ -414,7 +433,10 @@ struct SStreamTask { SStreamState* pState; // state backend SArray* pRspMsgList; SUpstreamInfo upstreamInfo; + // the followings attributes don't be serialized + SScanhistorySchedInfo schedHistoryInfo; + int32_t notReadyTasks; int32_t numOfWaitingUpstream; int64_t checkReqId; @@ -734,8 +756,6 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) // recover and fill history void streamTaskCheckDownstream(SStreamTask* pTask); -int32_t onNormalTaskReady(SStreamTask* pTask); -int32_t onScanhistoryTaskReady(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); @@ -757,7 +777,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); +int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); + int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common @@ -777,21 +799,14 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); -typedef enum { - TASK_SCANHISTORY_CONT = 0x1, - TASK_SCANHISTORY_QUIT = 0x2, - TASK_SCANHISTORY_REXEC = 0x3, -} EScanHistoryRet; - // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); -EScanHistoryRet streamScanHistoryData(SStreamTask* pTask); +SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); // agg level -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, - SRpcHandleInfo* pRpcInfo); +int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo); int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); // stream task meta diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 04eb13423d..4bdb7a2032 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1105,6 +1105,10 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask } } +static void ddxx() { + +} + // this function should be executed by only one thread, so we set an sentinel to protect this function int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; @@ -1168,18 +1172,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } - EScanHistoryRet ret = streamScanHistoryData(pTask); + SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask); // todo update the step1 exec elapsed time double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; - if (ret == TASK_SCANHISTORY_QUIT || ret == TASK_SCANHISTORY_REXEC) { + if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) { int8_t status = streamTaskSetSchedStatusInactive(pTask); atomic_store_32(&pTask->status.inScanHistorySentinel, 0); - if (ret == TASK_SCANHISTORY_REXEC) { - // todo wait for 100ms and retry - streamStartScanHistoryAsync(pTask, 0); + if (retInfo.ret == TASK_SCANHISTORY_REXEC) { + streamReExecScanHistoryFuture(pTask, retInfo.idleTime); } else { char* p = NULL; ETaskStatus s = streamTaskGetStatus(pTask, &p); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 51d51ebbef..4b64737936 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -25,7 +25,7 @@ typedef struct STableSinkInfo { tstr name; } STableSinkInfo; -static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks); +static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks); static int32_t tsAscendingSortFn(const void* p1, const void* p2); static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, SSubmitTbData* pTableData); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 2370890006..52e67ef456 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -22,6 +22,8 @@ static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); +static bool taskReadyForDataFromWal(SStreamTask* pTask); +static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. int32_t tqScanWal(STQ* pTq) { @@ -384,14 +386,13 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { return false; } -static bool taskReadyForDataFromWal(SStreamTask* pTask) { +bool taskReadyForDataFromWal(SStreamTask* pTask) { // non-source or fill-history tasks don't need to response the WAL scan action. if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) { return false; } // not in ready state, do not handle the data from wal -// int32_t status = pTask->status.taskStatus; char* p = NULL; int32_t status = streamTaskGetStatus(pTask, &p); if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__READY) { @@ -423,7 +424,7 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) { return true; } -static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { +bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { const char* id = pTask->id.idStr; int32_t numOfNewItems = 0; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 09d26119f1..aa9b26381e 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -140,6 +140,9 @@ void* streamQueueNextItem(SStreamQueue* pQueue); void streamFreeQitem(SStreamQueueItem* data); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); +int32_t onNormalTaskReady(SStreamTask* pTask); +int32_t onScanhistoryTaskReady(SStreamTask* pTask); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index edfc66762d..5665e7a917 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1007,7 +1007,6 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, info.msg.info = *pRpcInfo; taosThreadMutexLock(&pTask->lock); - stDebug("s-task:%s lock", pTask->id.idStr); if (pTask->pRspMsgList == NULL) { pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index fa042b5687..d81a5f0c2f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -187,7 +187,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return code; } -EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) { +SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); int32_t code = TSDB_CODE_SUCCESS; @@ -201,7 +201,7 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) { if (streamTaskShouldPause(pTask)) { double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); - return TASK_SCANHISTORY_QUIT; // quit from step1, not continue to handle the step2 + return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; // quit from step1, not continue to handle the step2 } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); @@ -216,13 +216,13 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) { while (1) { if (streamTaskShouldStop(pTask)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return TASK_SCANHISTORY_QUIT; + return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; } if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { stDebug("s-task:%s level:%d inputQ is blocked, retry later", pTask->id.idStr, pTask->info.taskLevel); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return TASK_SCANHISTORY_REXEC; + return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; } SSDataBlock* output = NULL; @@ -261,7 +261,7 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { terrno = code; stDebug("s-task:%s dump fill-history results failed, code:%s, retry in 100ms", pTask->id.idStr, tstrerror(code)); - return TASK_SCANHISTORY_REXEC; + return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; } } else { taosArrayDestroy(pRes); @@ -271,13 +271,11 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) { if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { stDebug("s-task:%s fill-history:%d level:%d timeslice for scan-history exhausted", pTask->id.idStr, pTask->info.fillHistory, pTask->info.taskLevel); - - // todo exec scanhistory in 100ms - return TASK_SCANHISTORY_REXEC; + return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; } } - return TASK_SCANHISTORY_CONT; + return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};; } // wait for the stream task to be idle diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index fdad5124ed..65a14aff6b 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -19,6 +19,10 @@ #include "wal.h" #include "streamsm.h" +#define SCANHISTORY_IDLE_TIME_SLICE 100 // 100ms +#define SCANHISTORY_MAX_IDLE_TIME 10 // 10 sec +#define SCANHISTORY_IDLE_TICK ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE) + typedef struct SLaunchHTaskInfo { SStreamMeta* pMeta; STaskId id; @@ -81,6 +85,50 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { return 0; } +static void doReExecScanhistory(void* param, void* tmrId) { + SStreamTask* pTask = param; + pTask->schedHistoryInfo.numOfTicks -= 1; + + char* p = NULL; + ETaskStatus status = streamTaskGetStatus(pTask, &p); + if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p, ref); + } + + if (pTask->schedHistoryInfo.numOfTicks <= 0) { + streamStartScanHistoryAsync(pTask, 0); + + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr, + pTask->info.fillHistory, ref); + } else { + taosTmrReset(doReExecScanhistory, 100, pTask, NULL, pTask->schedHistoryInfo.pTimer); + } +} + +int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) { + int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE; + if (numOfTicks <= 0) { + numOfTicks = 1; + } else if (numOfTicks > SCANHISTORY_IDLE_TICK) { + numOfTicks = SCANHISTORY_IDLE_TICK; + } + + pTask->schedHistoryInfo.numOfTicks = numOfTicks; + + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s scan-history start in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref); + + if (pTask->schedHistoryInfo.pTimer == NULL) { + pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer); + } else { + taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, pTask->schedHistoryInfo.pTimer); + } + + return TSDB_CODE_SUCCESS; +} + static int32_t doStartScanHistoryTask(SStreamTask* pTask) { SVersionRange* pRange = &pTask->dataRange.range; if (pTask->info.fillHistory) { @@ -684,9 +732,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { int32_t hTaskId = pHTaskInfo->id.taskId; streamTaskGetStatus(pTask, &p); - stDebug( - "s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", - pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); + stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", + pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); streamMetaReleaseTask(pMeta, pTask);