fix(stream): exec scan-history in timer.
This commit is contained in:
parent
7370daa6e0
commit
8c2f6bab0e
|
@ -241,6 +241,24 @@ typedef struct {
|
|||
SEpSet epset;
|
||||
} SDownstreamTaskEpset;
|
||||
|
||||
typedef enum {
|
||||
TASK_SCANHISTORY_CONT = 0x1,
|
||||
TASK_SCANHISTORY_QUIT = 0x2,
|
||||
TASK_SCANHISTORY_REXEC = 0x3,
|
||||
} EScanHistoryRet;
|
||||
|
||||
typedef struct {
|
||||
EScanHistoryRet ret;
|
||||
int32_t idleTime;
|
||||
} SScanhistoryDataInfo;
|
||||
|
||||
typedef struct {
|
||||
int32_t idleDuration; // idle time before use time slice the continue execute scan-history
|
||||
int32_t numOfTicks;
|
||||
tmr_h pTimer;
|
||||
int32_t execCount;
|
||||
} SScanhistorySchedInfo;
|
||||
|
||||
typedef struct {
|
||||
int64_t stbUid;
|
||||
char stbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
|
@ -378,9 +396,10 @@ typedef struct STaskOutputInfo {
|
|||
union {
|
||||
STaskDispatcherFixed fixedDispatcher;
|
||||
STaskDispatcherShuffle shuffleDispatcher;
|
||||
STaskSinkTb tbSink;
|
||||
STaskSinkSma smaSink;
|
||||
STaskSinkFetch fetchSink;
|
||||
|
||||
STaskSinkTb tbSink;
|
||||
STaskSinkSma smaSink;
|
||||
STaskSinkFetch fetchSink;
|
||||
};
|
||||
int8_t type;
|
||||
STokenBucket* pTokenBucket;
|
||||
|
@ -414,7 +433,10 @@ struct SStreamTask {
|
|||
SStreamState* pState; // state backend
|
||||
SArray* pRspMsgList;
|
||||
SUpstreamInfo upstreamInfo;
|
||||
|
||||
// the followings attributes don't be serialized
|
||||
SScanhistorySchedInfo schedHistoryInfo;
|
||||
|
||||
int32_t notReadyTasks;
|
||||
int32_t numOfWaitingUpstream;
|
||||
int64_t checkReqId;
|
||||
|
@ -734,8 +756,6 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen)
|
|||
|
||||
// recover and fill history
|
||||
void streamTaskCheckDownstream(SStreamTask* pTask);
|
||||
int32_t onNormalTaskReady(SStreamTask* pTask);
|
||||
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
|
||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||
|
@ -757,7 +777,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
||||
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
|
||||
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration);
|
||||
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
||||
|
||||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||
|
||||
// common
|
||||
|
@ -777,21 +799,14 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
|||
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
||||
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
||||
|
||||
typedef enum {
|
||||
TASK_SCANHISTORY_CONT = 0x1,
|
||||
TASK_SCANHISTORY_QUIT = 0x2,
|
||||
TASK_SCANHISTORY_REXEC = 0x3,
|
||||
} EScanHistoryRet;
|
||||
|
||||
// source level
|
||||
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||
EScanHistoryRet streamScanHistoryData(SStreamTask* pTask);
|
||||
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask);
|
||||
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
||||
|
||||
// agg level
|
||||
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
|
||||
SRpcHandleInfo* pRpcInfo);
|
||||
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo);
|
||||
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
|
||||
|
||||
// stream task meta
|
||||
|
|
|
@ -1105,6 +1105,10 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
|||
}
|
||||
}
|
||||
|
||||
static void ddxx() {
|
||||
|
||||
}
|
||||
|
||||
// this function should be executed by only one thread, so we set an sentinel to protect this function
|
||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
|
||||
|
@ -1168,18 +1172,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
EScanHistoryRet ret = streamScanHistoryData(pTask);
|
||||
SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask);
|
||||
|
||||
// todo update the step1 exec elapsed time
|
||||
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
|
||||
|
||||
if (ret == TASK_SCANHISTORY_QUIT || ret == TASK_SCANHISTORY_REXEC) {
|
||||
if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) {
|
||||
int8_t status = streamTaskSetSchedStatusInactive(pTask);
|
||||
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
||||
|
||||
if (ret == TASK_SCANHISTORY_REXEC) {
|
||||
// todo wait for 100ms and retry
|
||||
streamStartScanHistoryAsync(pTask, 0);
|
||||
if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
|
||||
streamReExecScanHistoryFuture(pTask, retInfo.idleTime);
|
||||
} else {
|
||||
char* p = NULL;
|
||||
ETaskStatus s = streamTaskGetStatus(pTask, &p);
|
||||
|
|
|
@ -25,7 +25,7 @@ typedef struct STableSinkInfo {
|
|||
tstr name;
|
||||
} STableSinkInfo;
|
||||
|
||||
static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
|
||||
static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
|
||||
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
|
||||
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
|
||||
SSubmitTbData* pTableData);
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
|
||||
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
|
||||
static bool taskReadyForDataFromWal(SStreamTask* pTask);
|
||||
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems);
|
||||
|
||||
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
|
||||
int32_t tqScanWal(STQ* pTq) {
|
||||
|
@ -384,14 +386,13 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static bool taskReadyForDataFromWal(SStreamTask* pTask) {
|
||||
bool taskReadyForDataFromWal(SStreamTask* pTask) {
|
||||
// non-source or fill-history tasks don't need to response the WAL scan action.
|
||||
if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// not in ready state, do not handle the data from wal
|
||||
// int32_t status = pTask->status.taskStatus;
|
||||
char* p = NULL;
|
||||
int32_t status = streamTaskGetStatus(pTask, &p);
|
||||
if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__READY) {
|
||||
|
@ -423,7 +424,7 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
|
||||
bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t numOfNewItems = 0;
|
||||
|
||||
|
|
|
@ -140,6 +140,9 @@ void* streamQueueNextItem(SStreamQueue* pQueue);
|
|||
void streamFreeQitem(SStreamQueueItem* data);
|
||||
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
|
||||
|
||||
int32_t onNormalTaskReady(SStreamTask* pTask);
|
||||
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1007,7 +1007,6 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
|
|||
info.msg.info = *pRpcInfo;
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
stDebug("s-task:%s lock", pTask->id.idStr);
|
||||
|
||||
if (pTask->pRspMsgList == NULL) {
|
||||
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
|
||||
|
|
|
@ -187,7 +187,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
|||
return code;
|
||||
}
|
||||
|
||||
EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) {
|
||||
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask) {
|
||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -201,7 +201,7 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) {
|
|||
if (streamTaskShouldPause(pTask)) {
|
||||
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
|
||||
stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
|
||||
return TASK_SCANHISTORY_QUIT; // quit from step1, not continue to handle the step2
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; // quit from step1, not continue to handle the step2
|
||||
}
|
||||
|
||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
|
@ -216,13 +216,13 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) {
|
|||
while (1) {
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
return TASK_SCANHISTORY_QUIT;
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
|
||||
}
|
||||
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
stDebug("s-task:%s level:%d inputQ is blocked, retry later", pTask->id.idStr, pTask->info.taskLevel);
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
return TASK_SCANHISTORY_REXEC;
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000};
|
||||
}
|
||||
|
||||
SSDataBlock* output = NULL;
|
||||
|
@ -261,7 +261,7 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
stDebug("s-task:%s dump fill-history results failed, code:%s, retry in 100ms", pTask->id.idStr, tstrerror(code));
|
||||
return TASK_SCANHISTORY_REXEC;
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
|
||||
}
|
||||
} else {
|
||||
taosArrayDestroy(pRes);
|
||||
|
@ -271,13 +271,11 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) {
|
|||
if (el >= STREAM_SCAN_HISTORY_TIMESLICE) {
|
||||
stDebug("s-task:%s fill-history:%d level:%d timeslice for scan-history exhausted", pTask->id.idStr,
|
||||
pTask->info.fillHistory, pTask->info.taskLevel);
|
||||
|
||||
// todo exec scanhistory in 100ms
|
||||
return TASK_SCANHISTORY_REXEC;
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
|
||||
}
|
||||
}
|
||||
|
||||
return TASK_SCANHISTORY_CONT;
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};;
|
||||
}
|
||||
|
||||
// wait for the stream task to be idle
|
||||
|
|
|
@ -19,6 +19,10 @@
|
|||
#include "wal.h"
|
||||
#include "streamsm.h"
|
||||
|
||||
#define SCANHISTORY_IDLE_TIME_SLICE 100 // 100ms
|
||||
#define SCANHISTORY_MAX_IDLE_TIME 10 // 10 sec
|
||||
#define SCANHISTORY_IDLE_TICK ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE)
|
||||
|
||||
typedef struct SLaunchHTaskInfo {
|
||||
SStreamMeta* pMeta;
|
||||
STaskId id;
|
||||
|
@ -81,6 +85,50 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void doReExecScanhistory(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = param;
|
||||
pTask->schedHistoryInfo.numOfTicks -= 1;
|
||||
|
||||
char* p = NULL;
|
||||
ETaskStatus status = streamTaskGetStatus(pTask, &p);
|
||||
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p, ref);
|
||||
}
|
||||
|
||||
if (pTask->schedHistoryInfo.numOfTicks <= 0) {
|
||||
streamStartScanHistoryAsync(pTask, 0);
|
||||
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
|
||||
pTask->info.fillHistory, ref);
|
||||
} else {
|
||||
taosTmrReset(doReExecScanhistory, 100, pTask, NULL, pTask->schedHistoryInfo.pTimer);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) {
|
||||
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
|
||||
if (numOfTicks <= 0) {
|
||||
numOfTicks = 1;
|
||||
} else if (numOfTicks > SCANHISTORY_IDLE_TICK) {
|
||||
numOfTicks = SCANHISTORY_IDLE_TICK;
|
||||
}
|
||||
|
||||
pTask->schedHistoryInfo.numOfTicks = numOfTicks;
|
||||
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s scan-history start in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref);
|
||||
|
||||
if (pTask->schedHistoryInfo.pTimer == NULL) {
|
||||
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer);
|
||||
} else {
|
||||
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, pTask->schedHistoryInfo.pTimer);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t doStartScanHistoryTask(SStreamTask* pTask) {
|
||||
SVersionRange* pRange = &pTask->dataRange.range;
|
||||
if (pTask->info.fillHistory) {
|
||||
|
@ -684,9 +732,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
int32_t hTaskId = pHTaskInfo->id.taskId;
|
||||
|
||||
streamTaskGetStatus(pTask, &p);
|
||||
stDebug(
|
||||
"s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
|
||||
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
|
||||
stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
|
||||
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
|
||||
|
||||
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
|
Loading…
Reference in New Issue