Merge pull request #22261 from taosdata/fix/3_liaohj
fix(stream): set the correct end key of delete block.
This commit is contained in:
commit
2640e41705
|
@ -221,13 +221,9 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
|
||||||
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
|
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
|
||||||
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
|
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
|
||||||
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
|
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
|
||||||
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo);
|
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo);
|
||||||
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo);
|
|
||||||
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo);
|
|
||||||
void resetTaskInfo(qTaskInfo_t tinfo);
|
void resetTaskInfo(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo);
|
|
||||||
|
|
||||||
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
|
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
|
||||||
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);
|
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);
|
||||||
|
|
||||||
|
|
|
@ -604,15 +604,10 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq*
|
||||||
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
|
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
|
||||||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
||||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
||||||
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
|
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
|
||||||
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
||||||
|
|
||||||
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
|
|
||||||
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
|
|
||||||
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask);
|
|
||||||
|
|
||||||
// common
|
// common
|
||||||
int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
|
||||||
int32_t streamRestoreParam(SStreamTask* pTask);
|
int32_t streamRestoreParam(SStreamTask* pTask);
|
||||||
int32_t streamSetStatusNormal(SStreamTask* pTask);
|
int32_t streamSetStatusNormal(SStreamTask* pTask);
|
||||||
const char* streamGetTaskStatusStr(int32_t status);
|
const char* streamGetTaskStatusStr(int32_t status);
|
||||||
|
@ -626,7 +621,6 @@ void streamTaskEnablePause(SStreamTask* pTask);
|
||||||
// source level
|
// source level
|
||||||
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||||
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||||
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
|
|
||||||
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
|
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
|
||||||
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
||||||
|
|
||||||
|
|
|
@ -1321,7 +1321,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
"s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time "
|
"s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time "
|
||||||
"window:%" PRId64 " - %" PRId64,
|
"window:%" PRId64 " - %" PRId64,
|
||||||
id, pWindow->skey, pWindow->ekey);
|
id, pWindow->skey, pWindow->ekey);
|
||||||
qResetStreamInfoTimeWindow(pTask->exec.pExecutor);
|
qStreamInfoResetTimewindowFilter(pTask->exec.pExecutor);
|
||||||
} else {
|
} else {
|
||||||
// when related fill-history task exists, update the fill-history time window only when the
|
// when related fill-history task exists, update the fill-history time window only when the
|
||||||
// state transfer is completed.
|
// state transfer is completed.
|
||||||
|
@ -1592,9 +1592,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
||||||
pReq->taskId);
|
pReq->taskId);
|
||||||
|
|
||||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1606,9 +1605,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
if (pTask->historyTaskId.taskId != 0) {
|
if (pTask->historyTaskId.taskId != 0) {
|
||||||
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
|
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
|
||||||
if (pHistoryTask == NULL) {
|
if (pHistoryTask == NULL) {
|
||||||
tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success",
|
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already",
|
||||||
pMeta->vgId, pTask->historyTaskId.taskId);
|
pMeta->vgId, pTask->historyTaskId.taskId);
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
||||||
|
@ -1616,14 +1614,12 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
|
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
|
||||||
streamTaskPause(pHistoryTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamTaskPause(pHistoryTask);
|
||||||
if (pHistoryTask != NULL) {
|
|
||||||
streamMetaReleaseTask(pMeta, pHistoryTask);
|
streamMetaReleaseTask(pMeta, pHistoryTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1652,7 +1648,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||||
streamStartRecoverTask(pTask, igUntreated);
|
streamStartScanHistoryAsync(pTask, igUntreated);
|
||||||
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
|
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
|
||||||
tqStartStreamTasks(pTq);
|
tqStartStreamTasks(pTq);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -62,8 +62,8 @@ typedef struct {
|
||||||
SSchemaWrapper* schema;
|
SSchemaWrapper* schema;
|
||||||
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
|
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
|
||||||
int8_t recoverStep;
|
int8_t recoverStep;
|
||||||
bool recoverStep1Finished;
|
// bool recoverStep1Finished;
|
||||||
bool recoverStep2Finished;
|
// bool recoverStep2Finished;
|
||||||
int8_t recoverScanFinished;
|
int8_t recoverScanFinished;
|
||||||
SQueryTableDataCond tableCond;
|
SQueryTableDataCond tableCond;
|
||||||
SVersionRange fillHistoryVer;
|
SVersionRange fillHistoryVer;
|
||||||
|
|
|
@ -116,17 +116,6 @@ void resetTaskInfo(qTaskInfo_t tinfo) {
|
||||||
clearStreamBlock(pTaskInfo->pRoot);
|
clearStreamBlock(pTaskInfo->pRoot);
|
||||||
}
|
}
|
||||||
|
|
||||||
void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
|
|
||||||
if (pTaskInfo == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("%s set stream fill-history window:%" PRId64"-%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN, INT64_MAX);
|
|
||||||
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
|
|
||||||
pTaskInfo->streamInfo.fillHistoryWindow.ekey = INT64_MAX;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
|
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
if (pOperator->numOfDownstream == 0) {
|
if (pOperator->numOfDownstream == 0) {
|
||||||
|
@ -341,7 +330,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
qResetStreamInfoTimeWindow(pTaskInfo);
|
qStreamInfoResetTimewindowFilter(pTaskInfo);
|
||||||
return pTaskInfo;
|
return pTaskInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -891,8 +880,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan
|
||||||
pStreamInfo->fillHistoryVer = *pVerRange;
|
pStreamInfo->fillHistoryVer = *pVerRange;
|
||||||
pStreamInfo->fillHistoryWindow = *pWindow;
|
pStreamInfo->fillHistoryWindow = *pWindow;
|
||||||
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1;
|
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1;
|
||||||
pStreamInfo->recoverStep1Finished = false;
|
|
||||||
pStreamInfo->recoverStep2Finished = false;
|
|
||||||
|
|
||||||
qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
|
qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
|
||||||
" - %" PRId64,
|
" - %" PRId64,
|
||||||
|
@ -910,8 +897,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
|
||||||
pStreamInfo->fillHistoryVer = *pVerRange;
|
pStreamInfo->fillHistoryVer = *pVerRange;
|
||||||
pStreamInfo->fillHistoryWindow = *pWindow;
|
pStreamInfo->fillHistoryWindow = *pWindow;
|
||||||
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
|
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
|
||||||
pStreamInfo->recoverStep1Finished = true;
|
|
||||||
pStreamInfo->recoverStep2Finished = false;
|
|
||||||
|
|
||||||
qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
|
qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
|
||||||
", window:%" PRId64 " - %" PRId64,
|
", window:%" PRId64 " - %" PRId64,
|
||||||
|
@ -1050,23 +1035,15 @@ bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
|
||||||
return pTaskInfo->streamInfo.recoverScanFinished;
|
return pTaskInfo->streamInfo.recoverScanFinished;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo) {
|
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
return pTaskInfo->streamInfo.recoverStep1Finished;
|
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
|
||||||
}
|
|
||||||
|
|
||||||
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo) {
|
qDebug("%s set remove scan-history filter window:%" PRId64 "-%" PRId64 ", new window:%" PRId64 "-%" PRId64,
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);
|
||||||
return pTaskInfo->streamInfo.recoverStep2Finished;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo) {
|
pWindow->skey = INT64_MIN;
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
pWindow->ekey = INT64_MAX;
|
||||||
pTaskInfo->streamInfo.recoverStep1Finished = true;
|
|
||||||
pTaskInfo->streamInfo.recoverStep2Finished = true;
|
|
||||||
|
|
||||||
// reset the time window
|
|
||||||
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1590,21 +1590,19 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW
|
||||||
}
|
}
|
||||||
|
|
||||||
// re-build the delete block, ONLY according to the split timestamp
|
// re-build the delete block, ONLY according to the split timestamp
|
||||||
static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) {
|
static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) {
|
||||||
if (skey == INT64_MIN) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfRows = pBlock->info.rows;
|
int32_t numOfRows = pBlock->info.rows;
|
||||||
|
|
||||||
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
|
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
|
||||||
bool hasUnqualified = false;
|
bool hasUnqualified = false;
|
||||||
|
int64_t skey = pWindow->skey;
|
||||||
|
int64_t ekey = pWindow->ekey;
|
||||||
|
|
||||||
SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
|
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
|
||||||
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;
|
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;
|
||||||
|
|
||||||
|
if (pWindow->skey != INT64_MIN) {
|
||||||
for (int32_t i = 0; i < numOfRows; i++) {
|
for (int32_t i = 0; i < numOfRows; i++) {
|
||||||
if (tsStartCol[i] < skey) {
|
if (tsStartCol[i] < skey) {
|
||||||
tsStartCol[i] = skey;
|
tsStartCol[i] = skey;
|
||||||
|
@ -1616,12 +1614,27 @@ static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char
|
||||||
hasUnqualified = true;
|
hasUnqualified = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if (pWindow->ekey != INT64_MAX) {
|
||||||
|
for(int32_t i = 0; i < numOfRows; ++i) {
|
||||||
|
if (tsEndCol[i] > ekey) {
|
||||||
|
tsEndCol[i] = ekey;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsStartCol[i] <= ekey) {
|
||||||
|
p[i] = true;
|
||||||
|
} else {
|
||||||
|
hasUnqualified = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (hasUnqualified) {
|
if (hasUnqualified) {
|
||||||
trimDataBlock(pBlock, pBlock->info.rows, p);
|
trimDataBlock(pBlock, pBlock->info.rows, p);
|
||||||
|
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
|
||||||
|
} else {
|
||||||
|
qDebug("%s not update the delete block", id);
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
|
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2030,7 +2043,7 @@ FETCH_NEXT_BLOCK:
|
||||||
}
|
}
|
||||||
|
|
||||||
setBlockGroupIdByUid(pInfo, pDelBlock);
|
setBlockGroupIdByUid(pInfo, pDelBlock);
|
||||||
rebuildDeleteBlockData(pDelBlock, pStreamInfo->fillHistoryWindow.skey, id);
|
rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
|
||||||
printDataBlock(pDelBlock, "stream scan delete recv filtered");
|
printDataBlock(pDelBlock, "stream scan delete recv filtered");
|
||||||
if (pDelBlock->info.rows == 0) {
|
if (pDelBlock->info.rows == 0) {
|
||||||
if (pInfo->tqReader) {
|
if (pInfo->tqReader) {
|
||||||
|
|
|
@ -62,6 +62,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
|
||||||
|
|
||||||
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
|
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
|
||||||
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
||||||
|
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
||||||
|
|
||||||
extern int32_t streamBackendId;
|
extern int32_t streamBackendId;
|
||||||
extern int32_t streamBackendCfWrapperId;
|
extern int32_t streamBackendCfWrapperId;
|
||||||
|
|
|
@ -163,15 +163,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
void* exec = pTask->exec.pExecutor;
|
void* exec = pTask->exec.pExecutor;
|
||||||
|
|
||||||
qSetStreamOpOpen(exec);
|
|
||||||
bool finished = false;
|
bool finished = false;
|
||||||
|
|
||||||
while (1) {
|
qSetStreamOpOpen(exec);
|
||||||
|
|
||||||
|
while (!finished) {
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldPause(&pTask->status)) {
|
||||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
|
||||||
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
|
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
|
||||||
|
@ -184,44 +183,30 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t batchCnt = 0;
|
int32_t numOfBlocks = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
if (streamTaskShouldStop(&pTask->status)) {
|
if (streamTaskShouldStop(&pTask->status)) {
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (streamTaskShouldPause(&pTask->status)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
SSDataBlock* output = NULL;
|
SSDataBlock* output = NULL;
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
if (qExecTask(exec, &output, &ts) < 0) {
|
if (qExecTask(exec, &output, &ts) < 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (output == NULL) {
|
if (output == NULL && qStreamRecoverScanFinished(exec)) {
|
||||||
if (qStreamRecoverScanFinished(exec)) {
|
|
||||||
finished = true;
|
finished = true;
|
||||||
} else {
|
|
||||||
qSetStreamOpOpen(exec);
|
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
|
||||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
|
||||||
if (qRes == NULL) {
|
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
|
||||||
qRes->blocks = pRes;
|
|
||||||
code = streamTaskOutputResultBlock(pTask, qRes);
|
|
||||||
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
|
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
|
||||||
taosFreeQitem(qRes);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
|
} else {
|
||||||
|
if (output == NULL) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock block = {0};
|
SSDataBlock block = {0};
|
||||||
|
@ -229,26 +214,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
||||||
block.info.childId = pTask->info.selfChildId;
|
block.info.childId = pTask->info.selfChildId;
|
||||||
taosArrayPush(pRes, &block);
|
taosArrayPush(pRes, &block);
|
||||||
|
|
||||||
batchCnt++;
|
numOfBlocks++;
|
||||||
|
qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, numOfBlocks, batchSz);
|
||||||
qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
|
if (numOfBlocks >= batchSz) {
|
||||||
if (batchCnt >= batchSz) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(pRes) == 0) {
|
if (taosArrayGetSize(pRes) > 0) {
|
||||||
taosArrayDestroy(pRes);
|
|
||||||
|
|
||||||
if (finished) {
|
|
||||||
qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
||||||
if (qRes == NULL) {
|
if (qRes == NULL) {
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
|
@ -258,57 +231,20 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
||||||
|
|
||||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||||
qRes->blocks = pRes;
|
qRes->blocks = pRes;
|
||||||
|
|
||||||
code = streamTaskOutputResultBlock(pTask, qRes);
|
code = streamTaskOutputResultBlock(pTask, qRes);
|
||||||
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
|
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
taosFreeQitem(qRes);
|
taosFreeQitem(qRes);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (finished) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
|
|
||||||
// fetch all queue item, merge according to batchLimit
|
|
||||||
int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
|
|
||||||
if (numOfItems == 0) {
|
|
||||||
qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
SStreamQueueItem* pMerged = NULL;
|
|
||||||
SStreamQueueItem* pItem = NULL;
|
|
||||||
taosGetQitem(pTask->inputQall, (void**)&pItem);
|
|
||||||
if (pItem == NULL) {
|
|
||||||
if (pMerged != NULL) {
|
|
||||||
// process merged item
|
|
||||||
} else {
|
} else {
|
||||||
|
taosArrayDestroy(pRes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// if drop
|
|
||||||
if (pItem->type == STREAM_INPUT__DESTROY) {
|
|
||||||
// set status drop
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
|
||||||
ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
|
|
||||||
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
|
|
||||||
}
|
|
||||||
|
|
||||||
// exec impl
|
|
||||||
|
|
||||||
// output
|
|
||||||
// try dispatch
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t updateCheckPointInfo(SStreamTask* pTask) {
|
int32_t updateCheckPointInfo(SStreamTask* pTask) {
|
||||||
int64_t ckId = 0;
|
int64_t ckId = 0;
|
||||||
|
@ -356,12 +292,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask == NULL) {
|
if (pStreamTask == NULL) {
|
||||||
// todo: destroy this task here
|
// todo: destroy the fill-history task here
|
||||||
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
|
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
|
||||||
pTask->streamTaskId.taskId);
|
pTask->streamTaskId.taskId);
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
|
@ -402,34 +338,36 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
|
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
// expand the query time window for stream scanner
|
// 1. expand the query time window for stream task of WAL scanner
|
||||||
pTimeWindow->skey = INT64_MIN;
|
pTimeWindow->skey = INT64_MIN;
|
||||||
qResetStreamInfoTimeWindow(pStreamTask->exec.pExecutor);
|
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
|
||||||
|
|
||||||
// transfer the ownership of executor state
|
// 2. transfer the ownership of executor state
|
||||||
streamTaskReleaseState(pTask);
|
streamTaskReleaseState(pTask);
|
||||||
streamTaskReloadState(pStreamTask);
|
streamTaskReloadState(pStreamTask);
|
||||||
|
|
||||||
// clear the link between fill-history task and stream task info
|
// 3. clear the link between fill-history task and stream task info
|
||||||
pStreamTask->historyTaskId.taskId = 0;
|
pStreamTask->historyTaskId.taskId = 0;
|
||||||
|
|
||||||
|
// 4. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
|
||||||
|
// pause, since the pause allowed attribute is not set yet.
|
||||||
streamTaskResumeFromHalt(pStreamTask);
|
streamTaskResumeFromHalt(pStreamTask);
|
||||||
|
|
||||||
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
|
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
|
||||||
int32_t taskId = pTask->id.taskId;
|
int32_t taskId = pTask->id.taskId;
|
||||||
|
|
||||||
// free it and remove it from disk meta-store
|
// 5. free it and remove fill-history task from disk meta-store
|
||||||
streamMetaUnregisterTask(pMeta, taskId);
|
streamMetaUnregisterTask(pMeta, taskId);
|
||||||
|
|
||||||
// save to disk
|
// 6. save to disk
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
streamMetaSaveTask(pMeta, pStreamTask);
|
streamMetaSaveTask(pMeta, pStreamTask);
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
// persist to disk
|
// persist to disk
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
// pause allowed
|
// 7. pause allowed.
|
||||||
streamTaskEnablePause(pStreamTask);
|
streamTaskEnablePause(pStreamTask);
|
||||||
|
|
||||||
streamSchedExec(pStreamTask);
|
streamSchedExec(pStreamTask);
|
||||||
|
@ -437,6 +375,26 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
if (!pTask->status.transferState) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t level = pTask->info.taskLevel;
|
||||||
|
if (level == TASK_LEVEL__SOURCE) {
|
||||||
|
streamTaskFillHistoryFinished(pTask);
|
||||||
|
streamTaskEndScanWAL(pTask);
|
||||||
|
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
|
||||||
|
code = streamDoTransferStateToStreamTask(pTask);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||||
const char* id) {
|
const char* id) {
|
||||||
int32_t retryTimes = 0;
|
int32_t retryTimes = 0;
|
||||||
|
@ -590,17 +548,16 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
|
||||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||||
|
|
||||||
// 3. notify downstream tasks to transfer executor state after handle all history blocks.
|
// 1. notify all downstream tasks to transfer executor state after handle all history blocks.
|
||||||
pTask->status.transferState = true;
|
|
||||||
|
|
||||||
int32_t code = streamDispatchTransferStateMsg(pTask);
|
int32_t code = streamDispatchTransferStateMsg(pTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// todo handle error
|
// todo handle error
|
||||||
}
|
}
|
||||||
|
|
||||||
// the last execution of fill-history task, in order to transfer task operator states.
|
// 2. do transfer stream task operator states.
|
||||||
code = streamTransferStateToStreamTask(pTask);
|
pTask->status.transferState = true;
|
||||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
code = streamDoTransferStateToStreamTask(pTask);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) { // todo handle error
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -624,9 +581,11 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
// todo the task should be commit here
|
// todo the task should be commit here
|
||||||
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
||||||
// fill-history WAL scan has completed
|
// fill-history WAL scan has completed
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) {
|
if (pTask->status.transferState) {
|
||||||
streamTaskRecoverSetAllStepFinished(pTask);
|
code = streamTransferStateToStreamTask(pTask);
|
||||||
streamTaskEndScanWAL(pTask);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||||
|
|
|
@ -17,23 +17,30 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
|
typedef struct SStreamTaskRetryInfo {
|
||||||
|
SStreamMeta* pMeta;
|
||||||
|
int32_t taskId;
|
||||||
|
} SStreamTaskRetryInfo;
|
||||||
|
|
||||||
|
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
|
||||||
static void launchFillHistoryTask(SStreamTask* pTask);
|
static void launchFillHistoryTask(SStreamTask* pTask);
|
||||||
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
|
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
|
||||||
|
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
|
||||||
|
|
||||||
static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) {
|
static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
pTask->status.downstreamReady = 1;
|
pTask->status.downstreamReady = 1;
|
||||||
int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
|
|
||||||
|
|
||||||
|
int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
|
||||||
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
|
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
|
||||||
pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
|
pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
|
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
|
||||||
SStreamScanHistoryReq req;
|
SStreamScanHistoryReq req;
|
||||||
streamBuildSourceRecover1Req(pTask, &req, igUntreated);
|
initScanHistoryReq(pTask, &req, igUntreated);
|
||||||
int32_t len = sizeof(SStreamScanHistoryReq);
|
|
||||||
|
|
||||||
|
int32_t len = sizeof(SStreamScanHistoryReq);
|
||||||
void* serializedReq = rpcMallocCont(len);
|
void* serializedReq = rpcMallocCont(len);
|
||||||
if (serializedReq == NULL) {
|
if (serializedReq == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -65,9 +72,9 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
streamSetParamForScanHistory(pTask);
|
streamSetParamForScanHistory(pTask);
|
||||||
}
|
}
|
||||||
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
|
|
||||||
|
|
||||||
int32_t code = streamStartRecoverTask(pTask, 0);
|
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
|
||||||
|
int32_t code = streamStartScanHistoryAsync(pTask, 0);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,7 +149,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
||||||
|
|
||||||
streamTaskSetForReady(pTask, 0);
|
streamTaskSetReady(pTask, 0);
|
||||||
streamTaskSetRangeStreamCalc(pTask);
|
streamTaskSetRangeStreamCalc(pTask);
|
||||||
streamTaskLaunchScanHistory(pTask);
|
streamTaskLaunchScanHistory(pTask);
|
||||||
|
|
||||||
|
@ -188,7 +195,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
|
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
|
||||||
streamTaskSetForReady(pTask, numOfReqs);
|
streamTaskSetReady(pTask, numOfReqs);
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
int8_t status = pTask->status.taskStatus;
|
int8_t status = pTask->status.taskStatus;
|
||||||
|
@ -319,7 +326,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *p
|
||||||
return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
|
return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
|
int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
|
||||||
pReq->msgHead.vgId = pTask->info.nodeId;
|
pReq->msgHead.vgId = pTask->info.nodeId;
|
||||||
pReq->streamId = pTask->id.streamId;
|
pReq->streamId = pTask->id.streamId;
|
||||||
pReq->taskId = pTask->id.taskId;
|
pReq->taskId = pTask->id.taskId;
|
||||||
|
@ -524,11 +531,6 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
|
||||||
streamTaskDoCheckDownstreamTasks(pHTask);
|
streamTaskDoCheckDownstreamTasks(pHTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SStreamTaskRetryInfo {
|
|
||||||
SStreamMeta* pMeta;
|
|
||||||
int32_t taskId;
|
|
||||||
} SStreamTaskRetryInfo;
|
|
||||||
|
|
||||||
static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
SStreamTaskRetryInfo* pInfo = param;
|
SStreamTaskRetryInfo* pInfo = param;
|
||||||
SStreamMeta* pMeta = pInfo->pMeta;
|
SStreamMeta* pMeta = pInfo->pMeta;
|
||||||
|
@ -638,7 +640,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatch recover finish req to all related downstream task
|
// dispatch scan-history finish req to all related downstream task
|
||||||
code = streamDispatchScanHistoryFinishMsg(pTask);
|
code = streamDispatchScanHistoryFinishMsg(pTask);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -647,19 +649,9 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask) {
|
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) {
|
||||||
void* exec = pTask->exec.pExecutor;
|
void* exec = pTask->exec.pExecutor;
|
||||||
return qStreamRecoverScanStep1Finished(exec);
|
return qStreamInfoResetTimewindowFilter(exec);
|
||||||
}
|
|
||||||
|
|
||||||
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask) {
|
|
||||||
void* exec = pTask->exec.pExecutor;
|
|
||||||
return qStreamRecoverScanStep2Finished(exec);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
|
|
||||||
void* exec = pTask->exec.pExecutor;
|
|
||||||
return qStreamRecoverSetAllStepFinished(exec);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
|
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
|
||||||
|
@ -669,7 +661,7 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
|
||||||
int64_t nextStartVer = pRange->maxVer + 1;
|
int64_t nextStartVer = pRange->maxVer + 1;
|
||||||
if (nextStartVer > latestVer - 1) {
|
if (nextStartVer > latestVer - 1) {
|
||||||
// no input data yet. no need to execute the secondardy scan while stream task halt
|
// no input data yet. no need to execute the secondardy scan while stream task halt
|
||||||
streamTaskRecoverSetAllStepFinished(pTask);
|
streamTaskFillHistoryFinished(pTask);
|
||||||
qDebug(
|
qDebug(
|
||||||
"s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
|
"s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
|
||||||
"related stream task currentVer:%" PRId64,
|
"related stream task currentVer:%" PRId64,
|
||||||
|
@ -684,7 +676,6 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
|
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
|
||||||
|
@ -857,7 +848,7 @@ void streamTaskPause(SStreamTask* pTask) {
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: use the lock of the task.
|
// todo: use the task lock, stead of meta lock
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
status = pTask->status.taskStatus;
|
status = pTask->status.taskStatus;
|
||||||
|
@ -871,6 +862,12 @@ void streamTaskPause(SStreamTask* pTask) {
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
// in case of fill-history task, stop the tsdb file scan operation.
|
||||||
|
if (pTask->info.fillHistory == 1) {
|
||||||
|
void* pExecutor = pTask->exec.pExecutor;
|
||||||
|
qKillTask(pExecutor, TSDB_CODE_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
int64_t el = taosGetTimestampMs() - st;
|
int64_t el = taosGetTimestampMs() - st;
|
||||||
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
|
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
|
||||||
streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
|
streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
|
||||||
|
|
Loading…
Reference in New Issue