Merge pull request #23548 from taosdata/fix/liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
666b8c531b
|
@ -213,7 +213,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan
|
|||
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
|
||||
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
|
||||
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
|
||||
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
|
||||
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo);
|
||||
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
|
||||
void resetTaskInfo(qTaskInfo_t tinfo);
|
||||
|
||||
|
|
|
@ -241,6 +241,24 @@ typedef struct {
|
|||
SEpSet epset;
|
||||
} SDownstreamTaskEpset;
|
||||
|
||||
typedef enum {
|
||||
TASK_SCANHISTORY_CONT = 0x1,
|
||||
TASK_SCANHISTORY_QUIT = 0x2,
|
||||
TASK_SCANHISTORY_REXEC = 0x3,
|
||||
} EScanHistoryRet;
|
||||
|
||||
typedef struct {
|
||||
EScanHistoryRet ret;
|
||||
int32_t idleTime;
|
||||
} SScanhistoryDataInfo;
|
||||
|
||||
typedef struct {
|
||||
int32_t idleDuration; // idle time before use time slice the continue execute scan-history
|
||||
int32_t numOfTicks;
|
||||
tmr_h pTimer;
|
||||
int32_t execCount;
|
||||
} SScanhistorySchedInfo;
|
||||
|
||||
typedef struct {
|
||||
int64_t stbUid;
|
||||
char stbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
|
@ -354,7 +372,9 @@ typedef struct STaskExecStatisInfo {
|
|||
int64_t init;
|
||||
int64_t start;
|
||||
int64_t step1Start;
|
||||
double step1El;
|
||||
int64_t step2Start;
|
||||
double step2El;
|
||||
int32_t updateCount;
|
||||
int64_t latestUpdateTs;
|
||||
int32_t processDataBlocks;
|
||||
|
@ -378,9 +398,10 @@ typedef struct STaskOutputInfo {
|
|||
union {
|
||||
STaskDispatcherFixed fixedDispatcher;
|
||||
STaskDispatcherShuffle shuffleDispatcher;
|
||||
STaskSinkTb tbSink;
|
||||
STaskSinkSma smaSink;
|
||||
STaskSinkFetch fetchSink;
|
||||
|
||||
STaskSinkTb tbSink;
|
||||
STaskSinkSma smaSink;
|
||||
STaskSinkFetch fetchSink;
|
||||
};
|
||||
int8_t type;
|
||||
STokenBucket* pTokenBucket;
|
||||
|
@ -414,7 +435,10 @@ struct SStreamTask {
|
|||
SStreamState* pState; // state backend
|
||||
SArray* pRspMsgList;
|
||||
SUpstreamInfo upstreamInfo;
|
||||
|
||||
// the followings attributes don't be serialized
|
||||
SScanhistorySchedInfo schedHistoryInfo;
|
||||
|
||||
int32_t notReadyTasks;
|
||||
int32_t numOfWaitingUpstream;
|
||||
int64_t checkReqId;
|
||||
|
@ -432,8 +456,10 @@ struct SStreamTask {
|
|||
typedef struct STaskStartInfo {
|
||||
int64_t startTs;
|
||||
int64_t readyTs;
|
||||
int32_t startAllTasksFlag;
|
||||
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
||||
int32_t tasksWillRestart;
|
||||
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
|
||||
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
||||
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
|
||||
int32_t elapsedTime;
|
||||
} STaskStartInfo;
|
||||
|
||||
|
@ -732,8 +758,6 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen)
|
|||
|
||||
// recover and fill history
|
||||
void streamTaskCheckDownstream(SStreamTask* pTask);
|
||||
int32_t onNormalTaskReady(SStreamTask* pTask);
|
||||
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
|
||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||
|
@ -755,7 +779,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
||||
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
|
||||
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration);
|
||||
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
||||
|
||||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||
|
||||
// common
|
||||
|
@ -778,12 +804,11 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
|||
// source level
|
||||
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||
int32_t streamScanHistoryData(SStreamTask* pTask);
|
||||
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
|
||||
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
||||
|
||||
// agg level
|
||||
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
|
||||
SRpcHandleInfo* pRpcInfo);
|
||||
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo);
|
||||
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
|
||||
|
||||
// stream task meta
|
||||
|
@ -805,11 +830,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta);
|
|||
void streamMetaStartHb(SStreamMeta* pMeta);
|
||||
void streamMetaInitForSnode(SStreamMeta* pMeta);
|
||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask);
|
||||
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ);
|
||||
void streamMetaRLock(SStreamMeta* pMeta);
|
||||
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||
void streamMetaWLock(SStreamMeta* pMeta);
|
||||
void streamMetaWUnLock(SStreamMeta* pMeta);
|
||||
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
||||
|
||||
// checkpoint
|
||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||
|
|
|
@ -43,9 +43,9 @@ extern "C" {
|
|||
|
||||
typedef struct STqOffsetStore STqOffsetStore;
|
||||
|
||||
// tqPush
|
||||
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
|
||||
#define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2)
|
||||
#define STREAM_EXEC_START_ALL_TASKS_ID (-2)
|
||||
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
|
||||
|
||||
// tqExec
|
||||
typedef struct {
|
||||
|
@ -155,9 +155,6 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer);
|
|||
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
||||
|
||||
// tqStream
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||
int32_t tqScanWal(STQ* pTq);
|
||||
int32_t tqStartStreamTask(STQ* pTq);
|
||||
int32_t tqResetStreamTaskStatus(STQ* pTq);
|
||||
int32_t tqStopStreamTasks(STQ* pTq);
|
||||
|
||||
|
|
|
@ -231,7 +231,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqLaunchStreamTaskAsync(STQ* pTq);
|
||||
|
||||
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart);
|
||||
int32_t tqRestartStreamTasks(STQ* pTq);
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||
int32_t tqScanWal(STQ* pTq);
|
||||
int32_t tqStartStreamTasks(STQ* pTq);
|
||||
|
||||
int tqCommit(STQ*);
|
||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||
|
|
|
@ -1061,7 +1061,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
|||
return code;
|
||||
}
|
||||
|
||||
static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
|
||||
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
|
||||
|
||||
|
@ -1102,7 +1102,7 @@ static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq)
|
|||
}
|
||||
}
|
||||
|
||||
// this function should be executed by only one thread
|
||||
// this function should be executed by only one thread, so we set an sentinel to protect this function
|
||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
|
@ -1131,6 +1131,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
// let's decide which step should be executed now
|
||||
if (pTask->execInfo.step1Start == 0) {
|
||||
ASSERT(pTask->status.pauseAllowed == false);
|
||||
int64_t ts = taosGetTimestampMs();
|
||||
|
@ -1144,9 +1145,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
} else {
|
||||
if (pTask->execInfo.step2Start == 0) {
|
||||
tqDebug("s-task:%s resume from paused, original step1 startTs:%" PRId64, id, pTask->execInfo.step1Start);
|
||||
tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs",
|
||||
id, pTask->execInfo.step1Start, pTask->execInfo.step1El);
|
||||
} else {
|
||||
tqDebug("s-task:%s already in step2, no need to scan-history data, step2 starTs:%"PRId64, id, pTask->execInfo.step2Start);
|
||||
tqDebug("s-task:%s already in step2, no need to scan-history data, step2 startTs:%" PRId64, id,
|
||||
pTask->execInfo.step2Start);
|
||||
|
||||
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return 0;
|
||||
|
@ -1164,20 +1168,37 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
streamScanHistoryData(pTask);
|
||||
int64_t st = taosGetTimestampMs();
|
||||
SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask, st);
|
||||
|
||||
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
|
||||
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__PAUSE) {
|
||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
pTask->execInfo.step1El += el;
|
||||
|
||||
if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) {
|
||||
int8_t status = streamTaskSetSchedStatusInactive(pTask);
|
||||
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
|
||||
|
||||
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
|
||||
|
||||
if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
|
||||
streamReExecScanHistoryFuture(pTask, retInfo.idleTime);
|
||||
} else {
|
||||
char* p = NULL;
|
||||
ETaskStatus s = streamTaskGetStatus(pTask, &p);
|
||||
|
||||
if (s == TASK_STATUS__PAUSE) {
|
||||
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr,
|
||||
el, pTask->execInfo.step1El, status);
|
||||
} else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) {
|
||||
tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr, p,
|
||||
pTask->execInfo.step1El);
|
||||
}
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// the following procedure should be executed, no matter status is stop/pause or not
|
||||
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el);
|
||||
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
|
||||
|
||||
if (pTask->info.fillHistory) {
|
||||
SStreamTask* pStreamTask = NULL;
|
||||
|
@ -1200,23 +1221,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
doStartStep2(pTask, pStreamTask, pTq);
|
||||
doStartFillhistoryStep2(pTask, pStreamTask, pTq);
|
||||
} else {
|
||||
tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
|
||||
} else {
|
||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
|
||||
|
||||
// Not update the fill-history time window until the state transfer is completed if the related fill-history task
|
||||
// exists.
|
||||
tqDebug(
|
||||
"s-task:%s scan-history in stream time window completed, now start to handle data from WAL, startVer:%" PRId64
|
||||
", window:%" PRId64 " - %" PRId64,
|
||||
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
|
||||
// Not update the fill-history time window until the state transfer is completed.
|
||||
tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64
|
||||
", window:%" PRId64 " - %" PRId64,
|
||||
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
|
||||
|
||||
code = streamTaskScanHistoryDataComplete(pTask);
|
||||
}
|
||||
|
@ -1294,14 +1312,15 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t taskId = pReq->taskId;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) {
|
||||
tqStartStreamTask(pTq);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal
|
||||
tqScanWal(pTq);
|
||||
return 0;
|
||||
} else if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) {
|
||||
tqStartStreamTasks(pTq);
|
||||
return 0;
|
||||
} else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) {
|
||||
tqRestartStreamTasks(pTq);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId);
|
||||
|
@ -1489,6 +1508,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
|||
streamSchedExec(pTask);
|
||||
}
|
||||
} else if (status == TASK_STATUS__UNINIT) {
|
||||
// todo: fill-history task init ?
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
||||
streamTaskHandleEvent(pTask->status.pSM, event);
|
||||
|
@ -1887,7 +1907,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
|
||||
|
||||
pMeta->startInfo.startAllTasksFlag = 1;
|
||||
pMeta->startInfo.tasksWillRestart = 1;
|
||||
|
||||
if (updateTasks < numOfTasks) {
|
||||
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
||||
|
@ -1896,45 +1916,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
} else {
|
||||
if (!pTq->pVnode->restored) {
|
||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
||||
pMeta->startInfo.startAllTasksFlag = 0;
|
||||
pMeta->startInfo.tasksWillRestart = 0;
|
||||
streamMetaWUnLock(pMeta);
|
||||
} else {
|
||||
tqInfo("vgId:%d tasks are all updated and stopped, restart them", vgId);
|
||||
terrno = 0;
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
while (streamMetaTaskInTimer(pMeta)) {
|
||||
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
int32_t code = streamMetaReopen(pMeta);
|
||||
if (code != 0) {
|
||||
tqError("vgId:%d failed to reopen stream meta", vgId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
taosArrayDestroy(req.pNodeList);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
|
||||
tqError("vgId:%d failed to load stream tasks", vgId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
taosArrayDestroy(req.pNodeList);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
||||
tqResetStreamTaskStatus(pTq);
|
||||
tqLaunchStreamTaskAsync(pTq);
|
||||
} else {
|
||||
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
tqStartStreamTaskAsync(pTq, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ typedef struct STableSinkInfo {
|
|||
tstr name;
|
||||
} STableSinkInfo;
|
||||
|
||||
static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
|
||||
static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
|
||||
static int32_t tsAscendingSortFn(const void* p1, const void* p2);
|
||||
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
|
||||
SSubmitTbData* pTableData);
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
|
||||
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
|
||||
static bool taskReadyForDataFromWal(SStreamTask* pTask);
|
||||
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems);
|
||||
|
||||
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
|
||||
int32_t tqScanWal(STQ* pTq) {
|
||||
|
@ -58,7 +60,7 @@ int32_t tqScanWal(STQ* pTq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqStartStreamTask(STQ* pTq) {
|
||||
int32_t tqStartStreamTasks(STQ* pTq) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
|
@ -73,6 +75,7 @@ int32_t tqStartStreamTask(STQ* pTq) {
|
|||
streamMetaWLock(pMeta);
|
||||
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
||||
taosHashClear(pMeta->startInfo.pFailedTaskSet);
|
||||
pMeta->startInfo.startTs = taosGetTimestampMs();
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
|
@ -97,7 +100,7 @@ int32_t tqStartStreamTask(STQ* pTq) {
|
|||
streamLaunchFillHistoryTask(pTask);
|
||||
}
|
||||
|
||||
streamMetaUpdateTaskReadyInfo(pTask);
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
@ -115,7 +118,70 @@ int32_t tqStartStreamTask(STQ* pTq) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
|
||||
int32_t tqRestartStreamTasks(STQ* pTq) {
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
int64_t st = taosGetTimestampMs();
|
||||
|
||||
while(1) {
|
||||
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
|
||||
if (startVal == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
|
||||
taosMsleep(500);
|
||||
}
|
||||
|
||||
terrno = 0;
|
||||
tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId,
|
||||
pMeta->updateInfo.transId);
|
||||
|
||||
while (streamMetaTaskInTimer(pMeta)) {
|
||||
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
code = streamMetaReopen(pMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d failed to reopen stream meta", vgId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
code = terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
|
||||
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
|
||||
|
||||
code = streamMetaLoadAllTasks(pTq->pStreamMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
|
||||
streamMetaWUnLock(pMeta);
|
||||
code = terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
||||
tqResetStreamTaskStatus(pTq);
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
tqStartStreamTasks(pTq);
|
||||
} else {
|
||||
streamMetaResetStartInfo(&pMeta->startInfo);
|
||||
streamMetaWUnLock(pMeta);
|
||||
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||
}
|
||||
|
||||
code = terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart) {
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
|
||||
|
@ -132,10 +198,10 @@ int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d check %d stream task(s) status async", vgId, numOfTasks);
|
||||
tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
|
||||
pRunReq->head.vgId = vgId;
|
||||
pRunReq->streamId = 0;
|
||||
pRunReq->taskId = STREAM_EXEC_TASK_STATUS_CHECK_ID;
|
||||
pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID;
|
||||
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
|
||||
|
@ -320,14 +386,13 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static bool taskReadyForDataFromWal(SStreamTask* pTask) {
|
||||
bool taskReadyForDataFromWal(SStreamTask* pTask) {
|
||||
// non-source or fill-history tasks don't need to response the WAL scan action.
|
||||
if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// not in ready state, do not handle the data from wal
|
||||
// int32_t status = pTask->status.taskStatus;
|
||||
char* p = NULL;
|
||||
int32_t status = streamTaskGetStatus(pTask, &p);
|
||||
if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__READY) {
|
||||
|
@ -359,7 +424,7 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
|
||||
bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t numOfNewItems = 0;
|
||||
|
||||
|
|
|
@ -554,7 +554,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
|||
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
if (pMeta->startInfo.startAllTasksFlag) {
|
||||
if (pMeta->startInfo.tasksWillRestart) {
|
||||
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
||||
streamMetaWUnLock(pMeta);
|
||||
return;
|
||||
|
@ -567,7 +567,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
|||
} else {
|
||||
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
|
||||
tqResetStreamTaskStatus(pVnode->pTq);
|
||||
tqLaunchStreamTaskAsync(pVnode->pTq);
|
||||
tqStartStreamTaskAsync(pVnode->pTq, false);
|
||||
}
|
||||
} else {
|
||||
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
||||
|
|
|
@ -1046,7 +1046,7 @@ int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
|
|||
}
|
||||
}
|
||||
|
||||
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
|
||||
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
return pTaskInfo->streamInfo.recoverScanFinished;
|
||||
}
|
||||
|
|
|
@ -127,13 +127,11 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
|||
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate);
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
||||
STaskId streamTaskExtractKey(const SStreamTask* pTask);
|
||||
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
||||
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
||||
|
||||
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
||||
|
||||
SStreamQueue* streamQueueOpen(int64_t cap);
|
||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
||||
void streamQueueProcessSuccess(SStreamQueue* queue);
|
||||
|
@ -142,6 +140,9 @@ void* streamQueueNextItem(SStreamQueue* pQueue);
|
|||
void streamFreeQitem(SStreamQueueItem* data);
|
||||
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
|
||||
|
||||
int32_t onNormalTaskReady(SStreamTask* pTask);
|
||||
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1007,7 +1007,6 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
|
|||
info.msg.info = *pRpcInfo;
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
stDebug("s-task:%s lock", pTask->id.idStr);
|
||||
|
||||
if (pTask->pRspMsgList == NULL) {
|
||||
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
|
||||
|
@ -1107,8 +1106,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
stError("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch data", id,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
|
||||
stWarn("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
|
||||
} else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
|
||||
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||
|
@ -1148,8 +1147,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
}
|
||||
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d",
|
||||
pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
stDebug("s-task:%s failed to dispatch msg to downstream, add into timer to retry in %dms, ref:%d",
|
||||
pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref);
|
||||
|
||||
streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
} else { // this message has been sent successfully, let's try next one.
|
||||
|
|
|
@ -18,7 +18,8 @@
|
|||
// maximum allowed processed block batches. One block may include several submit blocks
|
||||
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
||||
#define STREAM_RESULT_DUMP_THRESHOLD 300
|
||||
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1)
|
||||
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data
|
||||
#define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms
|
||||
|
||||
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
|
||||
|
||||
|
@ -48,10 +49,9 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl
|
|||
}
|
||||
|
||||
streamDispatchStreamBlock(pTask);
|
||||
return code;
|
||||
}
|
||||
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
|
||||
|
@ -187,83 +187,118 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t streamScanHistoryData(SStreamTask* pTask) {
|
||||
static int32_t handleResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (taosArrayGetSize(pRes) > 0) {
|
||||
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
|
||||
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stDebug("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
} else {
|
||||
taosArrayDestroy(pRes);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* pSize, bool* pFinish) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
void* exec = pTask->exec.pExecutor;
|
||||
int32_t numOfBlocks = 0;
|
||||
|
||||
while (1) {
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
stDebug("s-task:%s level:%d inputQ is blocked, retry in 5s", pTask->id.idStr, pTask->info.taskLevel);
|
||||
break;
|
||||
}
|
||||
|
||||
SSDataBlock* output = NULL;
|
||||
uint64_t ts = 0;
|
||||
code = qExecTask(exec, &output, &ts);
|
||||
if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s scan-history data error occurred code:%s, continue scan-history", pTask->id.idStr,
|
||||
tstrerror(code));
|
||||
continue;
|
||||
}
|
||||
|
||||
// the generated results before fill-history task been paused, should be dispatched to sink node
|
||||
if (output == NULL) {
|
||||
(*pFinish) = qStreamScanhistoryFinished(exec);
|
||||
break;
|
||||
}
|
||||
|
||||
SSDataBlock block = {0};
|
||||
assignOneDataBlock(&block, output);
|
||||
block.info.childId = pTask->info.selfChildId;
|
||||
taosArrayPush(pRes, &block);
|
||||
|
||||
(*pSize) += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
|
||||
numOfBlocks += 1;
|
||||
|
||||
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || (*pSize) >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
|
||||
stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
|
||||
pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(*pSize), STREAM_RESULT_DUMP_THRESHOLD,
|
||||
SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
void* exec = pTask->exec.pExecutor;
|
||||
bool finished = false;
|
||||
|
||||
qSetStreamOpOpen(exec);
|
||||
|
||||
while (!finished) {
|
||||
while (1) {
|
||||
if (streamTaskShouldPause(pTask)) {
|
||||
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0;
|
||||
stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
|
||||
break;
|
||||
stDebug("s-task:%s paused from the scan-history task", pTask->id.idStr);
|
||||
// quit from step1, not continue to handle the step2
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
|
||||
}
|
||||
|
||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
if (pRes == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr,
|
||||
tstrerror(terrno));
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t size = 0;
|
||||
int32_t numOfBlocks = 0;
|
||||
while (1) {
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
return 0;
|
||||
}
|
||||
streamScanHistoryDataImpl(pTask, pRes, &size, &finished);
|
||||
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
stDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr);
|
||||
taosMsleep(10000);
|
||||
continue;
|
||||
}
|
||||
|
||||
SSDataBlock* output = NULL;
|
||||
uint64_t ts = 0;
|
||||
code = qExecTask(exec, &output, &ts);
|
||||
if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
|
||||
stError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code));
|
||||
continue;
|
||||
}
|
||||
|
||||
// the generated results before fill-history task been paused, should be dispatched to sink node
|
||||
if (output == NULL) {
|
||||
finished = qStreamRecoverScanFinished(exec);
|
||||
break;
|
||||
}
|
||||
|
||||
SSDataBlock block = {0};
|
||||
assignOneDataBlock(&block, output);
|
||||
block.info.childId = pTask->info.selfChildId;
|
||||
taosArrayPush(pRes, &block);
|
||||
|
||||
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
|
||||
|
||||
if ((++numOfBlocks) >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
|
||||
stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
|
||||
pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(size), STREAM_RESULT_DUMP_THRESHOLD,
|
||||
SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
|
||||
break;
|
||||
}
|
||||
if(streamTaskShouldStop(pTask)) {
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pRes) > 0) {
|
||||
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
|
||||
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
taosArrayDestroy(pRes);
|
||||
// dispatch the generated results
|
||||
int32_t code = handleResultBlocks(pTask, pRes, size);
|
||||
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
|
||||
// downstream task input queue is full, try in 5sec
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000};
|
||||
}
|
||||
|
||||
if (finished) {
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};
|
||||
}
|
||||
|
||||
if (el >= STREAM_SCAN_HISTORY_TIMESLICE) {
|
||||
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms",
|
||||
pTask->id.idStr, pTask->info.fillHistory, el / 1000.0);
|
||||
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// wait for the stream task to be idle
|
||||
|
@ -273,7 +308,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
|||
int64_t st = taosGetTimestampMs();
|
||||
while (!streamTaskIsIdle(pStreamTask)) {
|
||||
stDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel,
|
||||
pStreamTask->id.idStr);
|
||||
pStreamTask->id.idStr);
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
|
|
|
@ -150,6 +150,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
|
||||
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
||||
if (pMeta->startInfo.pReadyTaskSet == NULL) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
|
||||
if (pMeta->startInfo.pFailedTaskSet == NULL) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
|
||||
|
@ -221,6 +227,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
|
||||
if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
|
||||
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
||||
if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
|
||||
taosMemoryFree(pMeta);
|
||||
|
||||
stError("failed to open stream meta");
|
||||
|
@ -228,12 +235,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
}
|
||||
|
||||
int32_t streamMetaReopen(SStreamMeta* pMeta) {
|
||||
// backup the restart flag
|
||||
int32_t restartFlag = pMeta->startInfo.startAllTasksFlag;
|
||||
streamMetaClear(pMeta);
|
||||
|
||||
pMeta->startInfo.startAllTasksFlag = restartFlag;
|
||||
|
||||
// NOTE: role should not be changed during reopen meta
|
||||
pMeta->streamBackendRid = -1;
|
||||
pMeta->streamBackend = NULL;
|
||||
|
@ -302,7 +305,10 @@ void streamMetaClear(SStreamMeta* pMeta) {
|
|||
pMeta->numOfPausedTasks = 0;
|
||||
pMeta->chkptNotReadyTasks = 0;
|
||||
|
||||
streamMetaResetStartInfo(&pMeta->startInfo);
|
||||
// the willrestart/starting flag can NOT be cleared
|
||||
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
||||
taosHashClear(pMeta->startInfo.pFailedTaskSet);
|
||||
pMeta->startInfo.readyTs = 0;
|
||||
}
|
||||
|
||||
void streamMetaClose(SStreamMeta* pMeta) {
|
||||
|
@ -342,6 +348,7 @@ void streamMetaCloseImpl(void* arg) {
|
|||
taosHashCleanup(pMeta->pTaskBackendUnique);
|
||||
taosHashCleanup(pMeta->updateInfo.pTasks);
|
||||
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
||||
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
|
||||
|
||||
taosMemoryFree(pMeta->pHbInfo);
|
||||
taosMemoryFree(pMeta->path);
|
||||
|
@ -1093,8 +1100,11 @@ void streamMetaInitForSnode(SStreamMeta* pMeta) {
|
|||
|
||||
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
|
||||
taosHashClear(pStartInfo->pReadyTaskSet);
|
||||
pStartInfo->startAllTasksFlag = 0;
|
||||
taosHashClear(pStartInfo->pFailedTaskSet);
|
||||
pStartInfo->tasksWillRestart = 0;
|
||||
pStartInfo->readyTs = 0;
|
||||
// reset the sentinel flag value to be 0
|
||||
atomic_store_32(&pStartInfo->taskStarting, 0);
|
||||
}
|
||||
|
||||
void streamMetaRLock(SStreamMeta* pMeta) {
|
||||
|
@ -1104,7 +1114,6 @@ void streamMetaRLock(SStreamMeta* pMeta) {
|
|||
void streamMetaRUnLock(SStreamMeta* pMeta) {
|
||||
stTrace("vgId:%d meta-runlock", pMeta->vgId);
|
||||
taosRUnLockLatch(&pMeta->lock);
|
||||
|
||||
}
|
||||
void streamMetaWLock(SStreamMeta* pMeta) {
|
||||
stTrace("vgId:%d meta-wlock", pMeta->vgId);
|
||||
|
|
|
@ -159,7 +159,8 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
|
||||
// no available token in bucket for sink task, let's wait for a little bit
|
||||
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
|
||||
stDebug("s-task:%s no available token in bucket for sink data, wait for 50ms", id);
|
||||
stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
|
||||
taosMsleep(10);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -340,10 +341,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
|||
return 0;
|
||||
}
|
||||
|
||||
// the result should be put into the outputQ in any cases, otherwise, the result may be lost
|
||||
// the result should be put into the outputQ in any cases, the result may be lost otherwise.
|
||||
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||
STaosQueue* pQueue = pTask->outputq.queue->pQueue;
|
||||
|
||||
// wait for the output queue is available for new data to dispatch
|
||||
while (streamQueueIsFull(pTask->outputq.queue)) {
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);
|
||||
|
@ -373,7 +375,8 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate) {
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
|
||||
const char* id) {
|
||||
if (numCap < 10 || numRate < 10 || pBucket == NULL) {
|
||||
stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
@ -388,6 +391,7 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
|
|||
pBucket->quotaRemain = pBucket->quotaCapacity;
|
||||
|
||||
pBucket->fillTimestamp = taosGetTimestampMs();
|
||||
stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -406,12 +410,12 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
|
|||
double incSize = (delta / 1000.0) * pBucket->quotaRate;
|
||||
if (incSize > 0) {
|
||||
pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
|
||||
pBucket->fillTimestamp = now;
|
||||
}
|
||||
|
||||
if (incNum > 0 || incSize > 0) {
|
||||
stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64
|
||||
" idle for %.2f Sec, %s",
|
||||
pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta / 1000.0, id);
|
||||
stTrace("token/quota available, token:%d inc:%d, quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle:%" PRId64 "ms, %s",
|
||||
pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta, id);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,10 @@
|
|||
#include "wal.h"
|
||||
#include "streamsm.h"
|
||||
|
||||
#define SCANHISTORY_IDLE_TIME_SLICE 100 // 100ms
|
||||
#define SCANHISTORY_MAX_IDLE_TIME 10 // 10 sec
|
||||
#define SCANHISTORY_IDLE_TICK ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE)
|
||||
|
||||
typedef struct SLaunchHTaskInfo {
|
||||
SStreamMeta* pMeta;
|
||||
STaskId id;
|
||||
|
@ -30,6 +34,12 @@ typedef struct STaskRecheckInfo {
|
|||
void* checkTimer;
|
||||
} STaskRecheckInfo;
|
||||
|
||||
typedef struct STaskInitTs {
|
||||
int64_t start;
|
||||
int64_t end;
|
||||
bool success;
|
||||
} STaskInitTs;
|
||||
|
||||
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
||||
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
|
||||
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
|
||||
|
@ -57,7 +67,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
|
|||
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
|
||||
pTask->id.idStr, numOfDowns, el, p);
|
||||
|
||||
streamMetaUpdateTaskReadyInfo(pTask);
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -81,6 +91,60 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void doReExecScanhistory(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = param;
|
||||
pTask->schedHistoryInfo.numOfTicks -= 1;
|
||||
|
||||
char* p = NULL;
|
||||
ETaskStatus status = streamTaskGetStatus(pTask, &p);
|
||||
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p, ref);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pTask->schedHistoryInfo.numOfTicks <= 0) {
|
||||
streamStartScanHistoryAsync(pTask, 0);
|
||||
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
|
||||
pTask->info.fillHistory, ref);
|
||||
|
||||
// release the task.
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
} else {
|
||||
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer,
|
||||
&pTask->schedHistoryInfo.pTimer);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) {
|
||||
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
|
||||
if (numOfTicks <= 0) {
|
||||
numOfTicks = 1;
|
||||
} else if (numOfTicks > SCANHISTORY_IDLE_TICK) {
|
||||
numOfTicks = SCANHISTORY_IDLE_TICK;
|
||||
}
|
||||
|
||||
// add ref for task
|
||||
SStreamTask* p = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||
ASSERT(p != NULL);
|
||||
|
||||
pTask->schedHistoryInfo.numOfTicks = numOfTicks;
|
||||
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref);
|
||||
|
||||
if (pTask->schedHistoryInfo.pTimer == NULL) {
|
||||
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer);
|
||||
} else {
|
||||
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, &pTask->schedHistoryInfo.pTimer);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t doStartScanHistoryTask(SStreamTask* pTask) {
|
||||
SVersionRange* pRange = &pTask->dataRange.range;
|
||||
if (pTask->info.fillHistory) {
|
||||
|
@ -318,6 +382,31 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
|||
streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
|
||||
}
|
||||
|
||||
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
|
||||
bool existed = false;
|
||||
for (int i = 0; i < num; ++i) {
|
||||
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
|
||||
if (p->nodeId == nodeId) {
|
||||
existed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!existed) {
|
||||
SDownstreamTaskEpset t = {.nodeId = nodeId};
|
||||
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
|
||||
|
||||
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId,
|
||||
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
}
|
||||
|
||||
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
||||
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
|
||||
const char* id = pTask->id.idStr;
|
||||
|
@ -367,40 +456,23 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
doProcessDownstreamReadyRsp(pTask);
|
||||
}
|
||||
} else { // not ready, wait for 100ms and retry
|
||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
|
||||
stError(
|
||||
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, "
|
||||
"not check wait for downstream task nodeUpdate, and all tasks restart",
|
||||
id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
|
||||
} else {
|
||||
if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
|
||||
stError(
|
||||
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, "
|
||||
"not check wait for downstream task nodeUpdate, and all tasks restart",
|
||||
id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
|
||||
} else {
|
||||
stError(
|
||||
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
|
||||
"downstream again, nodeUpdate needed",
|
||||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
|
||||
bool existed = false;
|
||||
for (int i = 0; i < num; ++i) {
|
||||
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
|
||||
if (p->nodeId == pRsp->downstreamNodeId) {
|
||||
existed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!existed) {
|
||||
SDownstreamTaskEpset t = {.nodeId = pRsp->downstreamNodeId};
|
||||
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
|
||||
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", id, vgId,
|
||||
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false);
|
||||
|
||||
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
||||
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
|
||||
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
|
@ -676,9 +748,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
int32_t hTaskId = pHTaskInfo->id.taskId;
|
||||
|
||||
streamTaskGetStatus(pTask, &p);
|
||||
stDebug(
|
||||
"s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
|
||||
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
|
||||
stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
|
||||
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
|
||||
|
||||
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
@ -975,28 +1046,57 @@ void streamTaskEnablePause(SStreamTask* pTask) {
|
|||
pTask->status.pauseAllowed = 1;
|
||||
}
|
||||
|
||||
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) {
|
||||
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
void* pIter = NULL;
|
||||
size_t keyLen = 0;
|
||||
|
||||
stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet),
|
||||
succ ? "success" : "failed");
|
||||
|
||||
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
|
||||
STaskInitTs* pInfo = pIter;
|
||||
void* key = taosHashGetKey(pIter, &keyLen);
|
||||
|
||||
SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId));
|
||||
if (pTask1 == NULL) {
|
||||
stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
|
||||
} else {
|
||||
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr,
|
||||
(*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
STaskId id = streamTaskExtractKey(pTask);
|
||||
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0);
|
||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||
|
||||
SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet;
|
||||
|
||||
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
||||
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
||||
|
||||
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
||||
|
||||
if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) {
|
||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||
|
||||
if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) {
|
||||
pStartInfo->readyTs = pTask->execInfo.start;
|
||||
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
||||
|
||||
streamMetaResetStartInfo(pStartInfo);
|
||||
|
||||
stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, startTs:%" PRId64
|
||||
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64
|
||||
", readyTs:%" PRId64 " total elapsed time:%.2fs",
|
||||
pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs,
|
||||
pStartInfo->elapsedTime / 1000.0);
|
||||
|
||||
// print the initialization elapsed time and info
|
||||
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
|
||||
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
||||
|
||||
streamMetaResetStartInfo(pStartInfo);
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
|
|
@ -400,8 +400,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
taosMemoryFree(pTask->outputInfo.pTokenBucket);
|
||||
taosThreadMutexDestroy(&pTask->lock);
|
||||
|
||||
taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList);
|
||||
pTask->outputInfo.pDownstreamUpdateList = NULL;
|
||||
pTask->outputInfo.pDownstreamUpdateList = taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList);
|
||||
|
||||
taosMemoryFree(pTask);
|
||||
stDebug("s-task:0x%x free task completed", taskId);
|
||||
|
@ -447,7 +446,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
|
||||
// 2MiB per second for sink task
|
||||
// 50 times sink operator per second
|
||||
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate);
|
||||
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate, pTask->id.idStr);
|
||||
|
||||
TdThreadMutexAttr attr = {0};
|
||||
int code = taosThreadMutexAttrInit(&attr);
|
||||
|
|
|
@ -315,7 +315,6 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
|||
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
|
||||
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
stDebug("s-task:%s unlockx", pTask->id.idStr);
|
||||
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue