fix:merge from 3.0

This commit is contained in:
yihaoDeng 2023-11-08 10:45:15 +08:00
commit 7ada7f34ac
28 changed files with 578 additions and 343 deletions

View File

@ -184,11 +184,7 @@ void qDestroyTask(qTaskInfo_t tinfo);
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList);
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len);
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order); void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery); void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
@ -217,7 +213,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo); int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); bool qStreamScanhistoryFinished(qTaskInfo_t tinfo);
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo); int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
void resetTaskInfo(qTaskInfo_t tinfo); void resetTaskInfo(qTaskInfo_t tinfo);

View File

@ -241,6 +241,24 @@ typedef struct {
SEpSet epset; SEpSet epset;
} SDownstreamTaskEpset; } SDownstreamTaskEpset;
typedef enum {
TASK_SCANHISTORY_CONT = 0x1,
TASK_SCANHISTORY_QUIT = 0x2,
TASK_SCANHISTORY_REXEC = 0x3,
} EScanHistoryRet;
typedef struct {
EScanHistoryRet ret;
int32_t idleTime;
} SScanhistoryDataInfo;
typedef struct {
int32_t idleDuration; // idle time before use time slice the continue execute scan-history
int32_t numOfTicks;
tmr_h pTimer;
int32_t execCount;
} SScanhistorySchedInfo;
typedef struct { typedef struct {
int64_t stbUid; int64_t stbUid;
char stbFullName[TSDB_TABLE_FNAME_LEN]; char stbFullName[TSDB_TABLE_FNAME_LEN];
@ -354,7 +372,9 @@ typedef struct STaskExecStatisInfo {
int64_t init; int64_t init;
int64_t start; int64_t start;
int64_t step1Start; int64_t step1Start;
double step1El;
int64_t step2Start; int64_t step2Start;
double step2El;
int32_t updateCount; int32_t updateCount;
int64_t latestUpdateTs; int64_t latestUpdateTs;
int32_t processDataBlocks; int32_t processDataBlocks;
@ -378,9 +398,10 @@ typedef struct STaskOutputInfo {
union { union {
STaskDispatcherFixed fixedDispatcher; STaskDispatcherFixed fixedDispatcher;
STaskDispatcherShuffle shuffleDispatcher; STaskDispatcherShuffle shuffleDispatcher;
STaskSinkTb tbSink;
STaskSinkSma smaSink; STaskSinkTb tbSink;
STaskSinkFetch fetchSink; STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
}; };
int8_t type; int8_t type;
STokenBucket* pTokenBucket; STokenBucket* pTokenBucket;
@ -414,7 +435,10 @@ struct SStreamTask {
SStreamState* pState; // state backend SStreamState* pState; // state backend
SArray* pRspMsgList; SArray* pRspMsgList;
SUpstreamInfo upstreamInfo; SUpstreamInfo upstreamInfo;
// the followings attributes don't be serialized // the followings attributes don't be serialized
SScanhistorySchedInfo schedHistoryInfo;
int32_t notReadyTasks; int32_t notReadyTasks;
int32_t numOfWaitingUpstream; int32_t numOfWaitingUpstream;
int64_t checkReqId; int64_t checkReqId;
@ -432,8 +456,10 @@ struct SStreamTask {
typedef struct STaskStartInfo { typedef struct STaskStartInfo {
int64_t startTs; int64_t startTs;
int64_t readyTs; int64_t readyTs;
int32_t startAllTasksFlag; int32_t tasksWillRestart;
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
int32_t elapsedTime; int32_t elapsedTime;
} STaskStartInfo; } STaskStartInfo;
@ -732,8 +758,6 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen)
// recover and fill history // recover and fill history
void streamTaskCheckDownstream(SStreamTask* pTask); void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t onNormalTaskReady(SStreamTask* pTask);
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
@ -755,7 +779,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
// common // common
@ -778,12 +804,11 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
// source level // source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamScanHistoryData(SStreamTask* pTask); SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
// agg level // agg level
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo);
SRpcHandleInfo* pRpcInfo);
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
// stream task meta // stream task meta
@ -805,11 +830,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta);
void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta);
void streamMetaInitForSnode(SStreamMeta* pMeta); void streamMetaInitForSnode(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask); int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ);
void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta);
void streamMetaWUnLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta);
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
// checkpoint // checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -1294,7 +1294,7 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
if (strcasecmp("keepColumnName", name) == 0) { if (strcasecmp("keepColumnName", name) == 0) {
tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval; tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
} else if (strcasecmp("keepAliveIdle", name) == 0) { } else if (strcasecmp("keepAliveIdle", name) == 0) {
tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->bval; tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32;
} else { } else {
matchItem = false; matchItem = false;
} }

View File

@ -43,9 +43,9 @@ extern "C" {
typedef struct STqOffsetStore STqOffsetStore; typedef struct STqOffsetStore STqOffsetStore;
// tqPush
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) #define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
#define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2) #define STREAM_EXEC_START_ALL_TASKS_ID (-2)
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
// tqExec // tqExec
@ -156,9 +156,6 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer);
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream // tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);
int32_t tqStartStreamTask(STQ* pTq);
int32_t tqResetStreamTaskStatus(STQ* pTq); int32_t tqResetStreamTaskStatus(STQ* pTq);
int32_t tqStopStreamTasks(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq);

View File

@ -231,7 +231,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqLaunchStreamTaskAsync(STQ* pTq);
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart);
int32_t tqRestartStreamTasks(STQ* pTq);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);
int32_t tqStartStreamTasks(STQ* pTq);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);

View File

@ -1064,7 +1064,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
return code; return code;
} }
static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) { static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
@ -1105,7 +1105,11 @@ static void doStartStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq)
} }
} }
// this function should be executed by only one thread static void ddxx() {
}
// this function should be executed by only one thread, so we set an sentinel to protect this function
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
@ -1134,6 +1138,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
} }
// let's decide which step should be executed now
if (pTask->execInfo.step1Start == 0) { if (pTask->execInfo.step1Start == 0) {
ASSERT(pTask->status.pauseAllowed == false); ASSERT(pTask->status.pauseAllowed == false);
int64_t ts = taosGetTimestampMs(); int64_t ts = taosGetTimestampMs();
@ -1147,7 +1152,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
} }
} else { } else {
if (pTask->execInfo.step2Start == 0) { if (pTask->execInfo.step2Start == 0) {
tqDebug("s-task:%s resume from paused, original step1 startTs:%" PRId64, id, pTask->execInfo.step1Start); tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs",
id, pTask->execInfo.step1Start, pTask->execInfo.step1El);
} else { } else {
tqDebug("s-task:%s already in step2, no need to scan-history data, step2 starTs:%"PRId64, id, pTask->execInfo.step2Start); tqDebug("s-task:%s already in step2, no need to scan-history data, step2 starTs:%"PRId64, id, pTask->execInfo.step2Start);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
@ -1167,20 +1173,37 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
streamScanHistoryData(pTask); int64_t st = taosGetTimestampMs();
SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask, st);
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__PAUSE) { pTask->execInfo.step1El += el;
if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) {
int8_t status = streamTaskSetSchedStatusInactive(pTask); int8_t status = streamTaskSetSchedStatusInactive(pTask);
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0); atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
streamReExecScanHistoryFuture(pTask, retInfo.idleTime);
} else {
char* p = NULL;
ETaskStatus s = streamTaskGetStatus(pTask, &p);
if (s == TASK_STATUS__PAUSE) {
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr,
el, pTask->execInfo.step1El, status);
} else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr, p,
pTask->execInfo.step1El);
}
}
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return 0; return 0;
} }
// the following procedure should be executed, no matter status is stop/pause or not // the following procedure should be executed, no matter status is stop/pause or not
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, el); tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
SStreamTask* pStreamTask = NULL; SStreamTask* pStreamTask = NULL;
@ -1203,23 +1226,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
doStartStep2(pTask, pStreamTask, pTq); doStartFillhistoryStep2(pTask, pStreamTask, pTq);
} else { } else {
tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr); tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr);
} }
streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask);
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
// Not update the fill-history time window until the state transfer is completed if the related fill-history task // Not update the fill-history time window until the state transfer is completed.
// exists. tqDebug("s-task:%s scan-history in stream time window completed, start to handle data from WAL, startVer:%" PRId64
tqDebug( ", window:%" PRId64 " - %" PRId64,
"s-task:%s scan-history in stream time window completed, now start to handle data from WAL, startVer:%" PRId64 id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
", window:%" PRId64 " - %" PRId64,
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
code = streamTaskScanHistoryDataComplete(pTask); code = streamTaskScanHistoryDataComplete(pTask);
} }
@ -1297,14 +1317,15 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) {
tqStartStreamTask(pTq);
return 0;
}
if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal
tqScanWal(pTq); tqScanWal(pTq);
return 0; return 0;
} else if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) {
tqStartStreamTasks(pTq);
return 0;
} else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) {
tqRestartStreamTasks(pTq);
return 0;
} }
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId);
@ -1492,6 +1513,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
streamSchedExec(pTask); streamSchedExec(pTask);
} }
} else if (status == TASK_STATUS__UNINIT) { } else if (status == TASK_STATUS__UNINIT) {
// todo: fill-history task init ?
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; EStreamTaskEvent event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event); streamTaskHandleEvent(pTask->status.pSM, event);
@ -1890,7 +1912,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
pMeta->startInfo.startAllTasksFlag = 1; pMeta->startInfo.tasksWillRestart = 1;
if (updateTasks < numOfTasks) { if (updateTasks < numOfTasks) {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
@ -1899,45 +1921,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
} else { } else {
if (!pTq->pVnode->restored) { if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.startAllTasksFlag = 0; pMeta->startInfo.tasksWillRestart = 0;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
} else { } else {
tqInfo("vgId:%d tasks are all updated and stopped, restart them", vgId);
terrno = 0;
streamMetaWUnLock(pMeta);
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
int32_t code = streamMetaReopen(pMeta);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqResetStreamTaskStatus(pTq);
tqLaunchStreamTaskAsync(pTq);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
tqStartStreamTaskAsync(pTq, true);
} }
} }

View File

@ -25,7 +25,7 @@ typedef struct STableSinkInfo {
tstr name; tstr name;
} STableSinkInfo; } STableSinkInfo;
static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks); static bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks);
static int32_t tsAscendingSortFn(const void* p1, const void* p2); static int32_t tsAscendingSortFn(const void* p1, const void* p2);
static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
SSubmitTbData* pTableData); SSubmitTbData* pTableData);

View File

@ -22,6 +22,8 @@
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
static bool taskReadyForDataFromWal(SStreamTask* pTask);
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems);
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
int32_t tqScanWal(STQ* pTq) { int32_t tqScanWal(STQ* pTq) {
@ -58,7 +60,7 @@ int32_t tqScanWal(STQ* pTq) {
return 0; return 0;
} }
int32_t tqStartStreamTask(STQ* pTq) { int32_t tqStartStreamTasks(STQ* pTq) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
@ -73,6 +75,7 @@ int32_t tqStartStreamTask(STQ* pTq) {
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL); pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet); taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs(); pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -97,7 +100,7 @@ int32_t tqStartStreamTask(STQ* pTq) {
streamLaunchFillHistoryTask(pTask); streamLaunchFillHistoryTask(pTask);
} }
streamMetaUpdateTaskReadyInfo(pTask); streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
continue; continue;
} }
@ -115,7 +118,67 @@ int32_t tqStartStreamTask(STQ* pTq) {
return code; return code;
} }
int32_t tqLaunchStreamTaskAsync(STQ* pTq) { int32_t tqRestartStreamTasks(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
int64_t st = taosGetTimestampMs();
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
terrno = 0;
tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId,
pMeta->updateInfo.transId);
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
code = streamMetaReopen(pMeta);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.);
code = streamMetaLoadAllTasks(pTq->pStreamMeta);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqResetStreamTaskStatus(pTq);
tqStartStreamTasks(pTq);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart) {
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
@ -132,10 +195,10 @@ int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
return -1; return -1;
} }
tqDebug("vgId:%d check %d stream task(s) status async", vgId, numOfTasks); tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
pRunReq->head.vgId = vgId; pRunReq->head.vgId = vgId;
pRunReq->streamId = 0; pRunReq->streamId = 0;
pRunReq->taskId = STREAM_EXEC_TASK_STATUS_CHECK_ID; pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
@ -320,14 +383,13 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
return false; return false;
} }
static bool taskReadyForDataFromWal(SStreamTask* pTask) { bool taskReadyForDataFromWal(SStreamTask* pTask) {
// non-source or fill-history tasks don't need to response the WAL scan action. // non-source or fill-history tasks don't need to response the WAL scan action.
if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) { if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
return false; return false;
} }
// not in ready state, do not handle the data from wal // not in ready state, do not handle the data from wal
// int32_t status = pTask->status.taskStatus;
char* p = NULL; char* p = NULL;
int32_t status = streamTaskGetStatus(pTask, &p); int32_t status = streamTaskGetStatus(pTask, &p);
if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__READY) { if (streamTaskGetStatus(pTask, &p) != TASK_STATUS__READY) {
@ -359,7 +421,7 @@ static bool taskReadyForDataFromWal(SStreamTask* pTask) {
return true; return true;
} }
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t numOfNewItems = 0; int32_t numOfNewItems = 0;
@ -449,7 +511,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
tqDebug("s-task:%s lock", pTask->id.idStr);
char* p = NULL; char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p); ETaskStatus status = streamTaskGetStatus(pTask, &p);

View File

@ -846,14 +846,14 @@ static void tLDataIterPinSttBlock(SLDataIter* pIter, const char* id) {
if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) { if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) {
pInfo->blockData[0].pin = true; pInfo->blockData[0].pin = true;
ASSERT(!pInfo->blockData[1].pin); ASSERT(!pInfo->blockData[1].pin);
tsdbDebug("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id); tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id);
return; return;
} }
if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) { if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) {
pInfo->blockData[1].pin = true; pInfo->blockData[1].pin = true;
ASSERT(!pInfo->blockData[0].pin); ASSERT(!pInfo->blockData[0].pin);
tsdbDebug("pin stt-block, blockIndex:%d, stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id); tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id);
return; return;
} }

View File

@ -565,9 +565,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (ASCENDING_TRAVERSE(pReader->info.order)) { if (ASCENDING_TRAVERSE(pReader->info.order)) {
w.skey = pScanInfo->lastKey + step; w.skey = pScanInfo->lastProcKey + step;
} else { } else {
w.ekey = pScanInfo->lastKey + step; w.ekey = pScanInfo->lastProcKey + step;
} }
if (isEmptyQueryTimeWindow(&w)) { if (isEmptyQueryTimeWindow(&w)) {
@ -607,14 +607,14 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
clearBrinBlockIter(&iter); clearBrinBlockIter(&iter);
pBlockNum->numOfLastFiles = pReader->status.pCurrentFileset->lvlArr->size; pBlockNum->numOfSttFiles = pReader->status.pCurrentFileset->lvlArr->size;
int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks; int32_t total = pBlockNum->numOfSttFiles + pBlockNum->numOfBlocks;
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug( tsdbDebug(
"load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed " "load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
"time:%.2f ms %s", "time:%.2f ms %s",
numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfLastFiles, numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfSttFiles,
sizeInDisk / 1000.0, el, pReader->idStr); sizeInDisk / 1000.0, el, pReader->idStr);
pReader->cost.numOfBlocks += total; pReader->cost.numOfBlocks += total;
@ -1200,13 +1200,12 @@ static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* p
} }
} }
static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, SLastBlockReader* pLastBlockReader, int32_t order) { static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, STableBlockScanInfo* pScanInfo, int32_t order) {
bool ascScan = ASCENDING_TRAVERSE(order); bool ascScan = ASCENDING_TRAVERSE(order);
bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
int64_t key = 0; int64_t key = 0;
if (bHasDataInLastBlock) { if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader); int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
key = ascScan ? TMIN(pBlock->record.firstKey, keyInStt) : TMAX(pBlock->record.lastKey, keyInStt); key = ascScan ? TMIN(pBlock->record.firstKey, keyInStt) : TMAX(pBlock->record.lastKey, keyInStt);
} else { } else {
key = ascScan ? pBlock->record.firstKey : pBlock->record.lastKey; key = ascScan ? pBlock->record.firstKey : pBlock->record.lastKey;
@ -1215,10 +1214,10 @@ static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, SLastBlockReader
return key; return key;
} }
static bool bufferDataInFileBlockGap(TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock, static bool bufferDataInFileBlockGap(TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock, STableBlockScanInfo* pScanInfo,
SLastBlockReader* pLastBlockReader, int32_t order) { int32_t order) {
bool ascScan = ASCENDING_TRAVERSE(order); bool ascScan = ASCENDING_TRAVERSE(order);
int64_t key = getBoarderKeyInFiles(pBlock, pLastBlockReader, order); int64_t key = getBoarderKeyInFiles(pBlock, pScanInfo, order);
return (ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts < key)) || return (ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts < key)) ||
(!ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts > key)); (!ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts > key));
@ -1302,10 +1301,9 @@ typedef struct {
} SDataBlockToLoadInfo; } SDataBlockToLoadInfo;
static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) {
STsdbReader* pReader) {
int32_t neighborIndex = 0;
SBrinRecord rec = {0}; SBrinRecord rec = {0};
int32_t neighborIndex = 0;
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex,
pReader->info.order, &rec); pReader->info.order, &rec);
@ -1319,9 +1317,11 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count) || (pBlockInfo->record.count <= 0); pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count) || (pBlockInfo->record.count <= 0);
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->info.order); pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->info.order);
if (hasDataInLastBlock(pLastBlockReader)) { ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
pInfo->overlapWithLastBlock = !(pBlockInfo->record.lastKey < tsLast || pBlockInfo->record.firstKey > tsLast); int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey;
pInfo->overlapWithLastBlock =
!(pBlockInfo->record.lastKey < nextProcKeyInStt || pBlockInfo->record.firstKey > nextProcKeyInStt);
} }
pInfo->moreThanCapcity = pBlockInfo->record.numRow > pReader->resBlockInfo.capacity; pInfo->moreThanCapcity = pBlockInfo->record.numRow > pReader->resBlockInfo.capacity;
@ -1336,9 +1336,9 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
// 5. delete info should not overlap with current block data // 5. delete info should not overlap with current block data
// 6. current block should not contain the duplicated ts // 6. current block should not contain the duplicated ts
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo,
TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) { TSDBKEY keyInBuf) {
SDataBlockToLoadInfo info = {0}; SDataBlockToLoadInfo info = {0};
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader); getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader);
bool loadDataBlock = bool loadDataBlock =
(info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf || (info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
@ -1358,9 +1358,9 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock
} }
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo,
TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) { TSDBKEY keyInBuf) {
SDataBlockToLoadInfo info = {0}; SDataBlockToLoadInfo info = {0};
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader); getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader);
bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf || bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
info.overlapWithDelInfo || info.overlapWithLastBlock); info.overlapWithDelInfo || info.overlapWithLastBlock);
return isCleanFileBlock; return isCleanFileBlock;
@ -1417,14 +1417,15 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
return code; return code;
} }
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, static bool nextRowFromSttBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
SVersionRange* pVerRange) { SVersionRange* pVerRange) {
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1;
while (1) { while (1) {
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
if (!hasVal) { // the next value will be the accessed key in stt if (!hasVal) { // the next value will be the accessed key in stt
pScanInfo->lastKeyInStt += step; pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
pScanInfo->sttKeyInfo.nextProcKey += step;
return false; return false;
} }
@ -1433,10 +1434,11 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
int64_t ver = pRow->pBlockData->aVersion[pRow->iRow]; int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
pLastBlockReader->currentKey = key; pLastBlockReader->currentKey = key;
pScanInfo->lastKeyInStt = key; pScanInfo->sttKeyInfo.nextProcKey = key;
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pLastBlockReader->order, if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pLastBlockReader->order,
pVerRange)) { pVerRange)) {
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
return true; return true;
} }
} }
@ -1457,7 +1459,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
// avoid the fetch next row replace the referenced stt block in buffer // avoid the fetch next row replace the referenced stt block in buffer
doPinSttBlock(pLastBlockReader); doPinSttBlock(pLastBlockReader);
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange); bool hasVal = nextRowFromSttBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange);
doUnpinSttBlock(pLastBlockReader); doUnpinSttBlock(pLastBlockReader);
if (hasVal) { if (hasVal) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
@ -1694,7 +1696,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
} }
if (copied) { if (copied) {
pBlockScanInfo->lastKey = tsLastBlock; pBlockScanInfo->lastProcKey = tsLastBlock;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
code = tsdbRowMergerAdd(pMerger, &fRow, NULL); code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
@ -2062,9 +2064,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
STbData* d = NULL; STbData* d = NULL;
TSDBKEY startKey = {0}; TSDBKEY startKey = {0};
if (ASCENDING_TRAVERSE(pReader->info.order)) { if (ASCENDING_TRAVERSE(pReader->info.order)) {
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->info.verRange.minVer}; startKey = (TSDBKEY){.ts = pBlockScanInfo->lastProcKey + 1, .version = pReader->info.verRange.minVer};
} else { } else {
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->info.verRange.maxVer}; startKey = (TSDBKEY){.ts = pBlockScanInfo->lastProcKey - 1, .version = pReader->info.verRange.maxVer};
} }
int32_t code = int32_t code =
@ -2129,9 +2131,9 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
STimeWindow w = pLBlockReader->window; STimeWindow w = pLBlockReader->window;
if (ASCENDING_TRAVERSE(pLBlockReader->order)) { if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
w.skey = pScanInfo->lastKeyInStt; w.skey = pScanInfo->sttKeyInfo.nextProcKey;
} else { } else {
w.ekey = pScanInfo->lastKeyInStt; w.ekey = pScanInfo->sttKeyInfo.nextProcKey;
} }
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
@ -2164,7 +2166,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
initMemDataIterator(pScanInfo, pReader); initMemDataIterator(pScanInfo, pReader);
initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
code = nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange); code = nextRowFromSttBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange);
int64_t el = taosGetTimestampUs() - st; int64_t el = taosGetTimestampUs() - st;
pReader->cost.initLastBlockReader += (el / 1000.0); pReader->cost.initLastBlockReader += (el / 1000.0);
@ -2209,7 +2211,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
} }
if (copied) { if (copied) {
pBlockScanInfo->lastKey = key; pBlockScanInfo->lastProcKey = key;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
@ -2354,16 +2356,16 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
// it is a clean block, load it directly // it is a clean block, load it directly
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf, pLastBlockReader) && int64_t cap = pReader->resBlockInfo.capacity;
(pRecord->numRow <= pReader->resBlockInfo.capacity)) { if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && (pRecord->numRow <= cap)) {
if (asc || (!hasDataInLastBlock(pLastBlockReader))) { if (asc || (pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA)) {
code = copyBlockDataToSDataBlock(pReader); code = copyBlockDataToSDataBlock(pReader);
if (code) { if (code) {
goto _end; goto _end;
} }
// record the last key value // record the last key value
pBlockScanInfo->lastKey = asc ? pRecord->lastKey : pRecord->firstKey; pBlockScanInfo->lastProcKey = asc ? pRecord->lastKey : pRecord->firstKey;
goto _end; goto _end;
} }
} }
@ -2378,6 +2380,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
} }
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
initLastBlockReader(pLastBlockReader, pBlockScanInfo, pReader);
while (1) { while (1) {
bool hasBlockData = false; bool hasBlockData = false;
@ -2527,7 +2530,7 @@ TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader)
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SArray* pTableList) { static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SArray* pTableList) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
pBlockNum->numOfBlocks = 0; pBlockNum->numOfBlocks = 0;
pBlockNum->numOfLastFiles = 0; pBlockNum->numOfSttFiles = 0;
size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBrinBlk)); SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBrinBlk));
@ -2564,7 +2567,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
return code; return code;
} }
if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) { if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) {
break; break;
} }
} }
@ -2684,11 +2687,13 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
} }
} }
static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, SLastBlockReader* pLastBlockReader, bool asc) { static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) {
if(!hasDataInLastBlock(pLastBlockReader)) { ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
if(pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) {
return true; return true;
} else { } else {
int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader); int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
return (asc && pBlockInfo->record.lastKey < keyInStt) || (!asc && pBlockInfo->record.firstKey > keyInStt); return (asc && pBlockInfo->record.lastKey < keyInStt) || (!asc && pBlockInfo->record.firstKey > keyInStt);
} }
} }
@ -2717,10 +2722,12 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return terrno; return terrno;
} }
initLastBlockReader(pLastBlockReader, pScanInfo, pReader); if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
}
if (fileBlockShouldLoad(pReader, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader)) { TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
if (fileBlockShouldLoad(pReader, pBlockInfo, pScanInfo, keyInBuf)) {
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid); code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
@ -2728,13 +2735,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// build composed data block // build composed data block
code = buildComposedDataBlock(pReader); code = buildComposedDataBlock(pReader);
} else if (bufferDataInFileBlockGap(keyInBuf, pBlockInfo, pLastBlockReader, pReader->info.order)) { } else if (bufferDataInFileBlockGap(keyInBuf, pBlockInfo, pScanInfo, pReader->info.order)) {
// data in memory that are earlier than current file block and stt blocks // data in memory that are earlier than current file block and stt blocks
// rows in buffer should be less than the file block in asc, greater than file block in desc // rows in buffer should be less than the file block in asc, greater than file block in desc
int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pLastBlockReader, pReader->info.order); int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pScanInfo, pReader->info.order);
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else { } else {
if (notOverlapWithSttFiles(pBlockInfo, pLastBlockReader, asc)) { if (notOverlapWithSttFiles(pBlockInfo, pScanInfo, asc)) {
// whole block is required, return it directly // whole block is required, return it directly
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info; SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
pInfo->rows = pBlockInfo->record.numRow; pInfo->rows = pBlockInfo->record.numRow;
@ -2745,7 +2752,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order); setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
// update the last key for the corresponding table // update the last key for the corresponding table
pScanInfo->lastKey = asc ? pInfo->window.ekey : pInfo->window.skey; pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64 tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, " " clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
@ -2760,8 +2767,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
tsdbDebug("load data in last block firstly %s", pReader->idStr); tsdbDebug("load data in last block firstly %s", pReader->idStr);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
// let's load data from stt files
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
// no data in last block, no need to proceed. // no data in last block, no need to proceed.
while (hasDataInLastBlock(pLastBlockReader)) { while (hasDataInLastBlock(pLastBlockReader)) {
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader); code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
@ -2988,7 +3000,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
if (pBlockInfo) { if (pBlockInfo) {
STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pScanInfo) { if (pScanInfo) {
lastKey = pScanInfo->lastKey; lastKey = pScanInfo->lastProcKey;
} }
pDumpInfo->totalRows = pBlockInfo->record.numRow; pDumpInfo->totalRows = pBlockInfo->record.numRow;
@ -3013,7 +3025,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
} }
// all data files are consumed, try data in buffer // all data files are consumed, try data in buffer
if (num.numOfBlocks + num.numOfLastFiles == 0) { if (num.numOfBlocks + num.numOfSttFiles == 0) {
pReader->status.loadFromFile = false; pReader->status.loadFromFile = false;
taosArrayDestroy(pTableList); taosArrayDestroy(pTableList);
return code; return code;
@ -3458,15 +3470,15 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) { SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) { while (nextRowFromSttBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) { if (next1 == ts) {
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL); tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else { } else {
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid, tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt, pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline),
idStr); pScanInfo->sttKeyInfo.nextProcKey, idStr);
break; break;
} }
} }
@ -3722,7 +3734,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
pBlock->info.dataLoad = 1; pBlock->info.dataLoad = 1;
pBlock->info.rows += 1; pBlock->info.rows += 1;
pScanInfo->lastKey = pTSRow->ts; pScanInfo->lastProcKey = pTSRow->ts;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3856,14 +3868,15 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
// todo extract method // todo extract method
if (ASCENDING_TRAVERSE(pReader->info.order)) { if (ASCENDING_TRAVERSE(pReader->info.order)) {
int64_t skey = pReader->info.window.skey; int64_t skey = pReader->info.window.skey;
pInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; pInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pInfo->lastKeyInStt = skey; pInfo->sttKeyInfo.nextProcKey = skey;
} else { } else {
int64_t ekey = pReader->info.window.ekey; int64_t ekey = pReader->info.window.ekey;
pInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; pInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pInfo->lastKeyInStt = ekey; pInfo->sttKeyInfo.nextProcKey = ekey;
} }
pInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
tSimpleHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES); tSimpleHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
} }
@ -4224,7 +4237,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
if (pBlockScanInfo) { if (pBlockScanInfo) {
// save lastKey to restore memory iterator // save lastKey to restore memory iterator
STimeWindow w = pReader->resBlockInfo.pResBlock->info.window; STimeWindow w = pReader->resBlockInfo.pResBlock->info.window;
pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey; pBlockScanInfo->lastProcKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey;
// reset current current table's data block scan info, // reset current current table's data block scan info,
pBlockScanInfo->iterInit = false; pBlockScanInfo->iterInit = false;

View File

@ -157,17 +157,18 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) { if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {
int64_t skey = pTsdbReader->info.window.skey; int64_t skey = pTsdbReader->info.window.skey;
pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; pScanInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->lastKeyInStt = skey; pScanInfo->sttKeyInfo.nextProcKey = skey;
} else { } else {
int64_t ekey = pTsdbReader->info.window.ekey; int64_t ekey = pTsdbReader->info.window.ekey;
pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; pScanInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->lastKeyInStt = ekey; pScanInfo->sttKeyInfo.nextProcKey = ekey;
} }
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
pScanInfo->lastKey, pTsdbReader->idStr); pScanInfo->lastProcKey, pTsdbReader->idStr);
} }
taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc); taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
@ -200,8 +201,8 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
} }
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
pInfo->lastKey = ts; pInfo->lastProcKey = ts;
pInfo->lastKeyInStt = ts + step; pInfo->sttKeyInfo.nextProcKey = ts + step;
} }
} }
@ -241,6 +242,7 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
taosArrayClear(pScanInfo->pBlockList); taosArrayClear(pScanInfo->pBlockList);
taosArrayClear(pScanInfo->pBlockIdxList); taosArrayClear(pScanInfo->pBlockIdxList);
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
} }
void cleanupInfoFoxNextFileset(SSHashObj* pTableMap) { void cleanupInfoFoxNextFileset(SSHashObj* pTableMap) {

View File

@ -63,20 +63,31 @@ typedef struct STableDataBlockIdx {
int32_t globalIndex; int32_t globalIndex;
} STableDataBlockIdx; } STableDataBlockIdx;
typedef enum ESttKeyStatus {
STT_FILE_READER_UNINIT = 0x0,
STT_FILE_NO_DATA = 0x1,
STT_FILE_HAS_DATA = 0x2,
} ESttKeyStatus;
typedef struct SSttKeyInfo {
ESttKeyStatus status; // this value should be updated when switch to the next fileset
int64_t nextProcKey;
} SSttKeyInfo;
typedef struct STableBlockScanInfo { typedef struct STableBlockScanInfo {
uint64_t uid; uint64_t uid;
TSKEY lastKey; TSKEY lastProcKey;
TSKEY lastKeyInStt; // last accessed key in stt SSttKeyInfo sttKeyInfo;
SArray* pBlockList; // block data index list, SArray<SBrinRecord> SArray* pBlockList; // block data index list, SArray<SBrinRecord>
SArray* pBlockIdxList; // SArray<STableDataBlockIndx> SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
SArray* pMemDelData; // SArray<SDelData> SArray* pMemDelData; // SArray<SDelData>
SArray* pFileDelData; // SArray<SDelData> from each file set SArray* pFileDelData; // SArray<SDelData> from each file set
SIterInfo iter; // mem buffer skip list iterator SIterInfo iter; // mem buffer skip list iterator
SIterInfo iiter; // imem buffer skip list iterator SIterInfo iiter; // imem buffer skip list iterator
SArray* delSkyline; // delete info for this table SArray* delSkyline; // delete info for this table
int32_t fileDelIndex; // file block delete index int32_t fileDelIndex; // file block delete index
int32_t sttBlockDelIndex; // delete index for last block int32_t sttBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not bool iterInit; // whether to initialize the in-memory skip list iterator or not
} STableBlockScanInfo; } STableBlockScanInfo;
typedef struct SResultBlockInfo { typedef struct SResultBlockInfo {
@ -108,7 +119,7 @@ typedef struct STableUidList {
typedef struct { typedef struct {
int32_t numOfBlocks; int32_t numOfBlocks;
int32_t numOfLastFiles; int32_t numOfSttFiles;
} SBlockNumber; } SBlockNumber;
typedef struct SBlockIndex { typedef struct SBlockIndex {

View File

@ -557,7 +557,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta; SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
if (pMeta->startInfo.startAllTasksFlag) { if (pMeta->startInfo.tasksWillRestart) {
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
return; return;
@ -570,7 +570,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
} else { } else {
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
tqResetStreamTaskStatus(pVnode->pTq); tqResetStreamTaskStatus(pVnode->pTq);
tqLaunchStreamTaskAsync(pVnode->pTq); tqStartStreamTaskAsync(pVnode->pTq, false);
} }
} else { } else {
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);

View File

@ -871,32 +871,6 @@ int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList); return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
} }
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
if (pTaskInfo->pRoot == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t nOptrWithVal = 0;
// int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
// if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
// taosMemoryFreeClear(*pOutput);
// *len = 0;
// }
return 0;
}
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
if (pTaskInfo == NULL || pInput == NULL || len == 0) {
return TSDB_CODE_INVALID_PARA;
}
return 0;
// return decodeOperator(pTaskInfo->pRoot, pInput, len);
}
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot; SOperatorInfo* pOperator = pTaskInfo->pRoot;
@ -1072,7 +1046,7 @@ int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
} }
} }
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverScanFinished; return pTaskInfo->streamInfo.recoverScanFinished;
} }

View File

@ -1584,8 +1584,7 @@ typedef union SRowsDataContext{
SStbRowsDataContext* pStbRowsCxt; SStbRowsDataContext* pStbRowsCxt;
} SRowsDataContext; } SRowsDataContext;
static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken, static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken, bool* pFoundCtbName) {
char* ctbName, bool* pFoundCtbName) {
*pFoundCtbName = false; *pFoundCtbName = false;
int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg); int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
if (code == TSDB_CODE_SUCCESS){ if (code == TSDB_CODE_SUCCESS){
@ -1595,7 +1594,13 @@ static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext*
if (pToken->n > 0) { if (pToken->n > 0) {
if (pToken->n <= TSDB_TABLE_NAME_LEN - 1) { if (pToken->n <= TSDB_TABLE_NAME_LEN - 1) {
memcpy(pStbRowsCxt->ctbName.tname, pToken->z, pToken->n); for (int i = 0; i < pToken->n; ++i) {
if (pToken->z[i] == '.') {
return buildInvalidOperationMsg(&pCxt->msg, "tbname can not contain '.'");
} else {
pStbRowsCxt->ctbName.tname[i] = pToken->z[i];
}
}
pStbRowsCxt->ctbName.tname[pToken->n] = '\0'; pStbRowsCxt->ctbName.tname[pToken->n] = '\0';
*pFoundCtbName = true; *pFoundCtbName = true;
} else { } else {
@ -1677,8 +1682,7 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
} }
} }
else if (pCols->pColIndex[i] == tbnameIdx) { else if (pCols->pColIndex[i] == tbnameIdx) {
char ctbName[TSDB_TABLE_NAME_LEN]; code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, bFoundTbName);
code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, ctbName, bFoundTbName);
} }
if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) { if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) {

View File

@ -266,7 +266,7 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
if (NULL == pScanCols) { if (NULL == pScanCols) {
if (NULL == pScanPseudoCols) { if (NULL == pScanPseudoCols) {
return SCAN_TYPE_TABLE; return (!tagScan) ? SCAN_TYPE_TABLE : SCAN_TYPE_TAG;
} }
return FUNCTION_TYPE_BLOCK_DIST_INFO == ((SFunctionNode*)nodesListGetNode(pScanPseudoCols, 0))->funcType return FUNCTION_TYPE_BLOCK_DIST_INFO == ((SFunctionNode*)nodesListGetNode(pScanPseudoCols, 0))->funcType
? SCAN_TYPE_BLOCK_INFO ? SCAN_TYPE_BLOCK_INFO

View File

@ -127,13 +127,11 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask); int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
STaskId streamTaskExtractKey(const SStreamTask* pTask); STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
SStreamQueue* streamQueueOpen(int64_t cap); SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
void streamQueueProcessSuccess(SStreamQueue* queue); void streamQueueProcessSuccess(SStreamQueue* queue);
@ -159,6 +157,9 @@ int downloadCheckpoint(char* id, char* path);
int deleteCheckpoint(char* id); int deleteCheckpoint(char* id);
int deleteCheckpointFile(char* id, char* name); int deleteCheckpointFile(char* id, char* name);
int32_t onNormalTaskReady(SStreamTask* pTask);
int32_t onScanhistoryTaskReady(SStreamTask* pTask);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -1007,7 +1007,6 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
info.msg.info = *pRpcInfo; info.msg.info = *pRpcInfo;
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
stDebug("s-task:%s lock", pTask->id.idStr);
if (pTask->pRspMsgList == NULL) { if (pTask->pRspMsgList == NULL) {
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));

View File

@ -18,7 +18,8 @@
// maximum allowed processed block batches. One block may include several submit blocks // maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_STREAM_EXEC_BATCH_NUM 32
#define STREAM_RESULT_DUMP_THRESHOLD 300 #define STREAM_RESULT_DUMP_THRESHOLD 300
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) #define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data
#define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
@ -48,10 +49,9 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl
} }
streamDispatchStreamBlock(pTask); streamDispatchStreamBlock(pTask);
return code;
} }
return 0; return code;
} }
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize, static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
@ -187,83 +187,118 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
return code; return code;
} }
int32_t streamScanHistoryData(SStreamTask* pTask) { static int32_t handleResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
int32_t code = TSDB_CODE_SUCCESS;
if (taosArrayGetSize(pRes) > 0) {
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) {
stDebug("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
}
} else {
taosArrayDestroy(pRes);
}
return code;
}
static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* pSize, bool* pFinish) {
int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor;
int32_t numOfBlocks = 0;
while (1) {
if (streamTaskShouldStop(pTask)) {
break;
}
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stDebug("s-task:%s level:%d inputQ is blocked, retry in 5s", pTask->id.idStr, pTask->info.taskLevel);
break;
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
code = qExecTask(exec, &output, &ts);
if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
stError("s-task:%s scan-history data error occurred code:%s, continue scan-history", pTask->id.idStr,
tstrerror(code));
continue;
}
// the generated results before fill-history task been paused, should be dispatched to sink node
if (output == NULL) {
(*pFinish) = qStreamScanhistoryFinished(exec);
break;
}
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);
(*pSize) += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
numOfBlocks += 1;
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || (*pSize) >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(*pSize), STREAM_RESULT_DUMP_THRESHOLD,
SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
break;
}
}
}
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
int32_t code = TSDB_CODE_SUCCESS;
void* exec = pTask->exec.pExecutor; void* exec = pTask->exec.pExecutor;
bool finished = false; bool finished = false;
qSetStreamOpOpen(exec); qSetStreamOpOpen(exec);
while (!finished) { while (1) {
if (streamTaskShouldPause(pTask)) { if (streamTaskShouldPause(pTask)) {
double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; stDebug("s-task:%s paused from the scan-history task", pTask->id.idStr);
stDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); // quit from step1, not continue to handle the step2
break; return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
} }
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) { if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr,
tstrerror(terrno));
continue;
} }
int32_t size = 0; int32_t size = 0;
int32_t numOfBlocks = 0; streamScanHistoryDataImpl(pTask, pRes, &size, &finished);
while (1) {
if (streamTaskShouldStop(pTask)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0;
}
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { if(streamTaskShouldStop(pTask)) {
stDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosMsleep(10000); return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0};
continue;
}
SSDataBlock* output = NULL;
uint64_t ts = 0;
code = qExecTask(exec, &output, &ts);
if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
stError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code));
continue;
}
// the generated results before fill-history task been paused, should be dispatched to sink node
if (output == NULL) {
finished = qStreamRecoverScanFinished(exec);
break;
}
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
block.info.childId = pTask->info.selfChildId;
taosArrayPush(pRes, &block);
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
if ((++numOfBlocks) >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
stDebug("s-task:%s scan exec numOfBlocks:%d, size:%.2fKiB output num-limit:%d, size-limit:%.2fKiB reached",
pTask->id.idStr, numOfBlocks, SIZE_IN_KiB(size), STREAM_RESULT_DUMP_THRESHOLD,
SIZE_IN_KiB(STREAM_RESULT_DUMP_SIZE_THRESHOLD));
break;
}
} }
if (taosArrayGetSize(pRes) > 0) { // dispatch the generated results
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); int32_t code = handleResultBlocks(pTask, pRes, size);
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) { int64_t el = taosGetTimestampMs() - st;
return code;
} // downstream task input queue is full, try in 5sec
} else { if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
taosArrayDestroy(pRes); return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000};
}
if (finished) {
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};
}
if (el >= STREAM_SCAN_HISTORY_TIMESLICE) {
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms",
pTask->id.idStr, pTask->info.fillHistory, el / 1000.0);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};
} }
} }
return 0;
} }
// wait for the stream task to be idle // wait for the stream task to be idle
@ -273,7 +308,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
while (!streamTaskIsIdle(pStreamTask)) { while (!streamTaskIsIdle(pStreamTask)) {
stDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel, stDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel,
pStreamTask->id.idStr); pStreamTask->id.idStr);
taosMsleep(100); taosMsleep(100);
} }
@ -647,23 +682,25 @@ int32_t streamExecTask(SStreamTask* pTask) {
int32_t streamTaskReleaseState(SStreamTask* pTask) { int32_t streamTaskReleaseState(SStreamTask* pTask) {
stDebug("s-task:%s release exec state", pTask->id.idStr); stDebug("s-task:%s release exec state", pTask->id.idStr);
void* pExecutor = pTask->exec.pExecutor; void* pExecutor = pTask->exec.pExecutor;
int32_t code = TSDB_CODE_SUCCESS;
if (pExecutor != NULL) { if (pExecutor != NULL) {
int32_t code = qStreamOperatorReleaseState(pExecutor); code = qStreamOperatorReleaseState(pExecutor);
return code;
} else {
return TSDB_CODE_SUCCESS;
} }
return code;
} }
int32_t streamTaskReloadState(SStreamTask* pTask) { int32_t streamTaskReloadState(SStreamTask* pTask) {
stDebug("s-task:%s reload exec state", pTask->id.idStr); stDebug("s-task:%s reload exec state", pTask->id.idStr);
void* pExecutor = pTask->exec.pExecutor; void* pExecutor = pTask->exec.pExecutor;
int32_t code = TSDB_CODE_SUCCESS;
if (pExecutor != NULL) { if (pExecutor != NULL) {
int32_t code = qStreamOperatorReloadState(pExecutor); code = qStreamOperatorReloadState(pExecutor);
return code;
} else {
return TSDB_CODE_SUCCESS;
} }
return code;
} }
int32_t streamAlignTransferState(SStreamTask* pTask) { int32_t streamAlignTransferState(SStreamTask* pTask) {

View File

@ -150,6 +150,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->startInfo.pReadyTaskSet == NULL) { if (pMeta->startInfo.pReadyTaskSet == NULL) {
goto _err;
}
pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
if (pMeta->startInfo.pFailedTaskSet == NULL) {
goto _err;
} }
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo)); pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
@ -221,6 +227,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo); if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks); if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet); if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
stError("failed to open stream meta"); stError("failed to open stream meta");
@ -228,12 +235,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
} }
int32_t streamMetaReopen(SStreamMeta* pMeta) { int32_t streamMetaReopen(SStreamMeta* pMeta) {
// backup the restart flag
int32_t restartFlag = pMeta->startInfo.startAllTasksFlag;
streamMetaClear(pMeta); streamMetaClear(pMeta);
pMeta->startInfo.startAllTasksFlag = restartFlag;
// NOTE: role should not be changed during reopen meta // NOTE: role should not be changed during reopen meta
pMeta->streamBackendRid = -1; pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL; pMeta->streamBackend = NULL;
@ -302,7 +305,10 @@ void streamMetaClear(SStreamMeta* pMeta) {
pMeta->numOfPausedTasks = 0; pMeta->numOfPausedTasks = 0;
pMeta->chkptNotReadyTasks = 0; pMeta->chkptNotReadyTasks = 0;
streamMetaResetStartInfo(&pMeta->startInfo); // the willrestart/starting flag can NOT be cleared
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.readyTs = 0;
} }
void streamMetaClose(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) {
@ -342,6 +348,7 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->pTaskBackendUnique); taosHashCleanup(pMeta->pTaskBackendUnique);
taosHashCleanup(pMeta->updateInfo.pTasks); taosHashCleanup(pMeta->updateInfo.pTasks);
taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
taosMemoryFree(pMeta->pHbInfo); taosMemoryFree(pMeta->pHbInfo);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
@ -1093,8 +1100,11 @@ void streamMetaInitForSnode(SStreamMeta* pMeta) {
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
taosHashClear(pStartInfo->pReadyTaskSet); taosHashClear(pStartInfo->pReadyTaskSet);
pStartInfo->startAllTasksFlag = 0; taosHashClear(pStartInfo->pFailedTaskSet);
pStartInfo->tasksWillRestart = 0;
pStartInfo->readyTs = 0; pStartInfo->readyTs = 0;
// reset the sentinel flag value to be 0
atomic_store_32(&pStartInfo->taskStarting, 0);
} }
void streamMetaRLock(SStreamMeta* pMeta) { void streamMetaRLock(SStreamMeta* pMeta) {
@ -1104,7 +1114,6 @@ void streamMetaRLock(SStreamMeta* pMeta) {
void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaRUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-runlock", pMeta->vgId); stTrace("vgId:%d meta-runlock", pMeta->vgId);
taosRUnLockLatch(&pMeta->lock); taosRUnLockLatch(&pMeta->lock);
} }
void streamMetaWLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wlock", pMeta->vgId); stTrace("vgId:%d meta-wlock", pMeta->vgId);

View File

@ -159,7 +159,8 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
// no available token in bucket for sink task, let's wait for a little bit // no available token in bucket for sink task, let's wait for a little bit
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
stDebug("s-task:%s no available token in bucket for sink data, wait for 50ms", id); stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
taosMsleep(10);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -340,10 +341,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return 0; return 0;
} }
// the result should be put into the outputQ in any cases, otherwise, the result may be lost // the result should be put into the outputQ in any cases, the result may be lost otherwise.
int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) {
STaosQueue* pQueue = pTask->outputq.queue->pQueue; STaosQueue* pQueue = pTask->outputq.queue->pQueue;
// wait for the output queue is available for new data to dispatch
while (streamQueueIsFull(pTask->outputq.queue)) { while (streamQueueIsFull(pTask->outputq.queue)) {
if (streamTaskShouldStop(pTask)) { if (streamTaskShouldStop(pTask)) {
stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr); stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr);
@ -373,7 +375,8 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate) { int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
const char* id) {
if (numCap < 10 || numRate < 10 || pBucket == NULL) { if (numCap < 10 || numRate < 10 || pBucket == NULL) {
stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate); stError("failed to init sink task bucket, cap:%d, rate:%d", numCap, numRate);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
@ -388,6 +391,7 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
pBucket->quotaRemain = pBucket->quotaCapacity; pBucket->quotaRemain = pBucket->quotaCapacity;
pBucket->fillTimestamp = taosGetTimestampMs(); pBucket->fillTimestamp = taosGetTimestampMs();
stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -406,12 +410,12 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
double incSize = (delta / 1000.0) * pBucket->quotaRate; double incSize = (delta / 1000.0) * pBucket->quotaRate;
if (incSize > 0) { if (incSize > 0) {
pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity); pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
pBucket->fillTimestamp = now;
} }
if (incNum > 0 || incSize > 0) { if (incNum > 0 || incSize > 0) {
stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64 stTrace("token/quota available, token:%d inc:%d, quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle:%" PRId64 "ms, %s",
" idle for %.2f Sec, %s", pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta, id);
pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta / 1000.0, id);
} }
} }

View File

@ -19,6 +19,10 @@
#include "wal.h" #include "wal.h"
#include "streamsm.h" #include "streamsm.h"
#define SCANHISTORY_IDLE_TIME_SLICE 100 // 100ms
#define SCANHISTORY_MAX_IDLE_TIME 10 // 10 sec
#define SCANHISTORY_IDLE_TICK ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE)
typedef struct SLaunchHTaskInfo { typedef struct SLaunchHTaskInfo {
SStreamMeta* pMeta; SStreamMeta* pMeta;
STaskId id; STaskId id;
@ -30,6 +34,12 @@ typedef struct STaskRecheckInfo {
void* checkTimer; void* checkTimer;
} STaskRecheckInfo; } STaskRecheckInfo;
typedef struct STaskInitTs {
int64_t start;
int64_t end;
bool success;
} STaskInitTs;
static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
@ -57,7 +67,7 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
pTask->id.idStr, numOfDowns, el, p); pTask->id.idStr, numOfDowns, el, p);
streamMetaUpdateTaskReadyInfo(pTask); streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -81,6 +91,60 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
return 0; return 0;
} }
static void doReExecScanhistory(void* param, void* tmrId) {
SStreamTask* pTask = param;
pTask->schedHistoryInfo.numOfTicks -= 1;
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
streamMetaReleaseTask(pTask->pMeta, pTask);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p, ref);
return;
}
if (pTask->schedHistoryInfo.numOfTicks <= 0) {
streamStartScanHistoryAsync(pTask, 0);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
pTask->info.fillHistory, ref);
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer,
&pTask->schedHistoryInfo.pTimer);
}
}
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) {
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
if (numOfTicks <= 0) {
numOfTicks = 1;
} else if (numOfTicks > SCANHISTORY_IDLE_TICK) {
numOfTicks = SCANHISTORY_IDLE_TICK;
}
// add ref for task
SStreamTask* p = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
ASSERT(p != NULL);
pTask->schedHistoryInfo.numOfTicks = numOfTicks;
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s scan-history start in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref);
if (pTask->schedHistoryInfo.pTimer == NULL) {
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer);
} else {
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, &pTask->schedHistoryInfo.pTimer);
}
return TSDB_CODE_SUCCESS;
}
static int32_t doStartScanHistoryTask(SStreamTask* pTask) { static int32_t doStartScanHistoryTask(SStreamTask* pTask) {
SVersionRange* pRange = &pTask->dataRange.range; SVersionRange* pRange = &pTask->dataRange.range;
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
@ -318,6 +382,31 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
streamTaskOnHandleEventSuccess(pTask->status.pSM, event); streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
} }
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t vgId = pTask->pMeta->vgId;
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
bool existed = false;
for (int i = 0; i < num; ++i) {
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
if (p->nodeId == nodeId) {
existed = true;
break;
}
}
if (!existed) {
SDownstreamTaskEpset t = {.nodeId = nodeId};
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId,
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
}
taosThreadMutexUnlock(&pTask->lock);
}
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
@ -367,40 +456,23 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
doProcessDownstreamReadyRsp(pTask); doProcessDownstreamReadyRsp(pTask);
} }
} else { // not ready, wait for 100ms and retry } else { // not ready, wait for 100ms and retry
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
stError( if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, " stError(
"not check wait for downstream task nodeUpdate, and all tasks restart", "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, "
id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage); "not check wait for downstream task nodeUpdate, and all tasks restart",
} else { id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { } else {
stError( stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed", "downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
bool existed = false;
for (int i = 0; i < num; ++i) {
SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, i);
if (p->nodeId == pRsp->downstreamNodeId) {
existed = true;
break;
}
}
if (!existed) {
SDownstreamTaskEpset t = {.nodeId = pRsp->downstreamNodeId};
taosArrayPush(pTask->outputInfo.pDownstreamUpdateList, &t);
stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", id, vgId,
t.nodeId, (int32_t)taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList));
}
taosThreadMutexUnlock(&pTask->lock);
return 0;
} }
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false);
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
@ -676,9 +748,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
int32_t hTaskId = pHTaskInfo->id.taskId; int32_t hTaskId = pHTaskInfo->id.taskId;
streamTaskGetStatus(pTask, &p); streamTaskGetStatus(pTask, &p);
stDebug( stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
"s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
@ -975,28 +1046,57 @@ void streamTaskEnablePause(SStreamTask* pTask) {
pTask->status.pauseAllowed = 1; pTask->status.pauseAllowed = 1;
} }
int32_t streamMetaUpdateTaskReadyInfo(SStreamTask* pTask) { static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
int32_t vgId = pMeta->vgId;
void* pIter = NULL;
size_t keyLen = 0;
stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet),
succ ? "success" : "failed");
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
STaskInitTs* pInfo = pIter;
void* key = taosHashGetKey(pIter, &keyLen);
SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId));
if (pTask1 == NULL) {
stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
} else {
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr,
(*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
}
}
}
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
STaskId id = streamTaskExtractKey(pTask); STaskId id = streamTaskExtractKey(pTask);
taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); STaskStartInfo* pStartInfo = &pMeta->startInfo;
SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet;
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) { if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
pStartInfo->readyTs = pTask->execInfo.start; pStartInfo->readyTs = pTask->execInfo.start;
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
streamMetaResetStartInfo(pStartInfo); stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64
stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs", ", readyTs:%" PRId64 " total elapsed time:%.2fs",
pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs, pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0); pStartInfo->elapsedTime / 1000.0);
// print the initialization elapsed time and info
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo);
} }
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);

View File

@ -400,8 +400,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosMemoryFree(pTask->outputInfo.pTokenBucket); taosMemoryFree(pTask->outputInfo.pTokenBucket);
taosThreadMutexDestroy(&pTask->lock); taosThreadMutexDestroy(&pTask->lock);
taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList); pTask->outputInfo.pDownstreamUpdateList = taosArrayDestroy(pTask->outputInfo.pDownstreamUpdateList);
pTask->outputInfo.pDownstreamUpdateList = NULL;
taosMemoryFree(pTask); taosMemoryFree(pTask);
stDebug("s-task:0x%x free task completed", taskId); stDebug("s-task:0x%x free task completed", taskId);
@ -447,7 +446,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
// 2MiB per second for sink task // 2MiB per second for sink task
// 50 times sink operator per second // 50 times sink operator per second
streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate); streamTaskInitTokenBucket(pTask->outputInfo.pTokenBucket, 50, 50, tsSinkDataRate, pTask->id.idStr);
TdThreadMutexAttr attr = {0}; TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr); int code = taosThreadMutexAttrInit(&attr);

View File

@ -315,7 +315,6 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s unlockx", pTask->id.idStr);
return TSDB_CODE_STREAM_INVALID_STATETRANS; return TSDB_CODE_STREAM_INVALID_STATETRANS;
} }

View File

@ -865,6 +865,9 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end); ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end);
if (pMsg->seq > pRcvBuf->cursor) { if (pMsg->seq > pRcvBuf->cursor) {
if (pRcvBuf->entries[pMsg->seq % pRcvBuf->size]) {
pRcvBuf->entryDeleteCb(pRcvBuf->entries[pMsg->seq % pRcvBuf->size]);
}
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg; pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
ppMsg[0] = NULL; ppMsg[0] = NULL;
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end); pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
@ -1002,7 +1005,8 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
} }
if (pMsg->term < raftStoreGetTerm(pSyncNode)) { if (pMsg->term < raftStoreGetTerm(pSyncNode)) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term"); sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
pMsg->seq);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1; return -1;
} }

View File

@ -268,7 +268,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
taosPrintLog(flags, level, dflag, taosPrintLog(flags, level, dflag,
"vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64 "vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRId64 " last-cfg:%" PRId64
", seq:%d, ack:%d, " ", seq:%d, ack:%d, "
" buf:[%" PRId64 " %" PRId64 ", %" PRId64 " buf:[%" PRId64 " %" PRId64 ", %" PRId64
"), finish:%d, as:%d, to-dnode:%d}" "), finish:%d, as:%d, to-dnode:%d}"

View File

@ -70,6 +70,7 @@ sql_error insert into d2.st values(now, 1, 1)
sql_error insert into d2.st(ts, f) values(now, 1); sql_error insert into d2.st(ts, f) values(now, 1);
sql_error insert into d2.st(ts, f, tbname) values(now, 1); sql_error insert into d2.st(ts, f, tbname) values(now, 1);
sql_error insert into d2.st(ts, f, tbname) values(now, 1, ''); sql_error insert into d2.st(ts, f, tbname) values(now, 1, '');
sql_error insert into d2.st(ts, f, tbname) values(now, 1, 'd2.ct2');
sql_error insert into d2.st(ts, tbname) values(now, 1, 34) sql_error insert into d2.st(ts, tbname) values(now, 1, 34)
sql_error insert into st using st2 tags(2) values(now,1); sql_error insert into st using st2 tags(2) values(now,1);
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -164,4 +164,9 @@ sql select count(*) from (select tags ts from stb34)
if $data00 != 2 then if $data00 != 2 then
return -1 return -1
endi endi
sql select tags 2 from stb34
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT