fix(stream): exec scan-history in timer.

This commit is contained in:
Haojun Liao 2023-11-06 18:55:53 +08:00
parent 6b1889284b
commit d9474016fc
8 changed files with 102 additions and 36 deletions

View File

@ -241,6 +241,24 @@ typedef struct {
SEpSet epset; SEpSet epset;
} SDownstreamTaskEpset; } SDownstreamTaskEpset;
typedef enum {
TASK_SCANHISTORY_CONT = 0x1,
TASK_SCANHISTORY_QUIT = 0x2,
TASK_SCANHISTORY_REXEC = 0x3,
} EScanHistoryRet;
typedef struct {
EScanHistoryRet ret;
int32_t idleTime;
} SScanhistoryDataInfo;
typedef struct {
int32_t idleDuration; // idle time before use time slice the continue execute scan-history
int32_t numOfTicks;
tmr_h pTimer;
int32_t execCount;
} SScanhistorySchedInfo;
typedef struct { typedef struct {
int64_t stbUid; int64_t stbUid;
char stbFullName[TSDB_TABLE_FNAME_LEN]; char stbFullName[TSDB_TABLE_FNAME_LEN];
@ -378,9 +396,10 @@ typedef struct STaskOutputInfo {
union { union {
STaskDispatcherFixed fixedDispatcher; STaskDispatcherFixed fixedDispatcher;
STaskDispatcherShuffle shuffleDispatcher; STaskDispatcherShuffle shuffleDispatcher;
STaskSinkTb tbSink;
STaskSinkSma smaSink; STaskSinkTb tbSink;
STaskSinkFetch fetchSink; STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
}; };
int8_t type; int8_t type;
STokenBucket* pTokenBucket; STokenBucket* pTokenBucket;
@ -414,7 +433,10 @@ struct SStreamTask {
SStreamState* pState; // state backend SStreamState* pState; // state backend
SArray* pRspMsgList; SArray* pRspMsgList;
SUpstreamInfo upstreamInfo; SUpstreamInfo upstreamInfo;
// the followings attributes don't be serialized // the followings attributes don't be serialized
SScanhistorySchedInfo schedHistoryInfo;
int32_t notReadyTasks; int32_t notReadyTasks;
int32_t numOfWaitingUpstream; int32_t numOfWaitingUpstream;
int64_t checkReqId; int64_t checkReqId;
@ -734,8 +756,6 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen)
// recover and fill history // recover and fill history
void streamTaskCheckDownstream(SStreamTask* pTask); void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t onNormalTaskReady(SStreamTask* pTask);
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
@ -757,7 +777,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common // common
@ -777,21 +799,14 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
typedef enum {
TASK_SCANHISTORY_CONT = 0x1,
TASK_SCANHISTORY_QUIT = 0x2,
TASK_SCANHISTORY_REXEC = 0x3,
} EScanHistoryRet;
// source level // source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
EScanHistoryRet streamScanHistoryData(SStreamTask* pTask); SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
// agg level // agg level
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo);
SRpcHandleInfo* pRpcInfo);
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
// stream task meta // stream task meta

View File

@ -1102,6 +1102,10 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
} }
} }
static void ddxx() {
}
// this function should be executed by only one thread, so we set an sentinel to protect this function // this function should be executed by only one thread, so we set an sentinel to protect this function
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
@ -1165,18 +1169,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
EScanHistoryRet ret = streamScanHistoryData(pTask); SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask);
// todo update the step1 exec elapsed time // todo update the step1 exec elapsed time
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
if (ret == TASK_SCANHISTORY_QUIT || ret == TASK_SCANHISTORY_REXEC) { if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) {
int8_t status = streamTaskSetSchedStatusInactive(pTask); int8_t status = streamTaskSetSchedStatusInactive(pTask);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
if (ret == TASK_SCANHISTORY_REXEC) { if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
// todo wait for 100ms and retry streamReExecScanHistoryFuture(pTask, retInfo.idleTime);
streamStartScanHistoryAsync(pTask, 0);
} else { } else {
char* p = NULL; char* p = NULL;
ETaskStatus s = streamTaskGetStatus(pTask, &p); ETaskStatus s = streamTaskGetStatus(pTask, &p);

View File

@ -25,7 +25,7 @@ typedef struct STableSinkInfo {
tstr name; tstr name;
} STableSinkInfo; } STableSinkInfo;
static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks); static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
static int32_t tsAscendingSortFn(const void* p1, const void* p2); static int32_t tsAscendingSortFn(const void* p1, const void* p2);
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
SSubmitTbData* pTableData); SSubmitTbData* pTableData);

View File

@ -22,6 +22,8 @@
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
static bool taskReadyForDataFromWal(SStreamTask* pTask);
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems);
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
int32_t tqScanWal(STQ* pTq) { int32_t tqScanWal(STQ* pTq) {
@ -384,14 +386,13 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
return false; return false;
} }
static bool taskReadyForDataFromWal(SStreamTask* pTask) { bool taskReadyForDataFromWal(SStreamTask* pTask) {
// non-source or fill-history tasks don't need to response the WAL scan action. // non-source or fill-history tasks don't need to response the WAL scan action.
if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) { if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
return false; return false;
} }
// not in ready state, do not handle the data from wal // not in ready state, do not handle the data from wal
// int32_t status = pTask->status.taskStatus;
char* p = NULL; char* p = NULL;
int32_t status = streamTaskGetStatus(pTask, &p); int32_t status = streamTaskGetStatus(pTask, &p);
if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__READY) { if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__READY) {
@ -423,7 +424,7 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) {
return true; return true;
} }
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t numOfNewItems = 0; int32_t numOfNewItems = 0;

View File

@ -140,6 +140,9 @@ void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data); void streamFreeQitem(SStreamQueueItem* data);
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
int32_t onNormalTaskReady(SStreamTask* pTask);
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -1007,7 +1007,6 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
info.msg.info = *pRpcInfo; info.msg.info = *pRpcInfo;
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
stDebug("s-task:%s lock", pTask->id.idStr);
if (pTask->pRspMsgList == NULL) { if (pTask->pRspMsgList == NULL) {
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));

View File

@ -187,7 +187,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return code; return code;
} }
EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) { SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -201,7 +201,7 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) {
if (streamTaskShouldPause(pTask)) { if (streamTaskShouldPause(pTask)) {
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
return TASK_SCANHISTORY_QUIT; // quit from step1, not continue to handle the step2 return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; // quit from step1, not continue to handle the step2
} }
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
@ -216,13 +216,13 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) {
while (1) { while (1) {
if (streamTaskShouldStop(pTask)) { if (streamTaskShouldStop(pTask)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return TASK_SCANHISTORY_QUIT; return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
} }
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stDebug("s-task:%s level:%d inputQ is blocked, retry later", pTask->id.idStr, pTask->info.taskLevel); stDebug("s-task:%s level:%d inputQ is blocked, retry later", pTask->id.idStr, pTask->info.taskLevel);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return TASK_SCANHISTORY_REXEC; return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000};
} }
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
@ -261,7 +261,7 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
stDebug("s-task:%s dump fill-history results failed, code:%s, retry in 100ms", pTask->id.idStr, tstrerror(code)); stDebug("s-task:%s dump fill-history results failed, code:%s, retry in 100ms", pTask->id.idStr, tstrerror(code));
return TASK_SCANHISTORY_REXEC; return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
} }
} else { } else {
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
@ -271,13 +271,11 @@ EScanHistoryRet streamScanHistoryData(SStreamTask* pTask) {
if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { if (el >= STREAM_SCAN_HISTORY_TIMESLICE) {
stDebug("s-task:%s fill-history:%d level:%d timeslice for scan-history exhausted", pTask->id.idStr, stDebug("s-task:%s fill-history:%d level:%d timeslice for scan-history exhausted", pTask->id.idStr,
pTask->info.fillHistory, pTask->info.taskLevel); pTask->info.fillHistory, pTask->info.taskLevel);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
// todo exec scanhistory in 100ms
return TASK_SCANHISTORY_REXEC;
} }
} }
return TASK_SCANHISTORY_CONT; return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};;
} }
// wait for the stream task to be idle // wait for the stream task to be idle

View File

@ -19,6 +19,10 @@
#include "wal.h" #include "wal.h"
#include "streamsm.h" #include "streamsm.h"
#define SCANHISTORY_IDLE_TIME_SLICE 100 // 100ms
#define SCANHISTORY_MAX_IDLE_TIME 10 // 10 sec
#define SCANHISTORY_IDLE_TICK ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE)
typedef struct SLaunchHTaskInfo { typedef struct SLaunchHTaskInfo {
SStreamMeta* pMeta; SStreamMeta* pMeta;
STaskId id; STaskId id;
@ -81,6 +85,50 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
return 0; return 0;
} }
static void doReExecScanhistory(void* param, void* tmrId) {
SStreamTask* pTask = param;
pTask->schedHistoryInfo.numOfTicks -= 1;
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p, ref);
}
if (pTask->schedHistoryInfo.numOfTicks <= 0) {
streamStartScanHistoryAsync(pTask, 0);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
pTask->info.fillHistory, ref);
} else {
taosTmrReset(doReExecScanhistory, 100, pTask, NULL, pTask->schedHistoryInfo.pTimer);
}
}
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) {
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
if (numOfTicks <= 0) {
numOfTicks = 1;
} else if (numOfTicks > SCANHISTORY_IDLE_TICK) {
numOfTicks = SCANHISTORY_IDLE_TICK;
}
pTask->schedHistoryInfo.numOfTicks = numOfTicks;
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s scan-history start in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref);
if (pTask->schedHistoryInfo.pTimer == NULL) {
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer);
} else {
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, pTask->schedHistoryInfo.pTimer);
}
return TSDB_CODE_SUCCESS;
}
static int32_t doStartScanHistoryTask(SStreamTask* pTask) { static int32_t doStartScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range; SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
@ -684,9 +732,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
int32_t hTaskId = pHTaskInfo->id.taskId; int32_t hTaskId = pHTaskInfo->id.taskId;
streamTaskGetStatus(pTask, &p); streamTaskGetStatus(pTask, &p);
stDebug( stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
"s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);