fix(stream): fix error in handling fill history.
This commit is contained in:
parent
4bb78df27c
commit
a19e63fd23
|
@ -195,14 +195,6 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
|
||||||
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
|
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
|
||||||
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
|
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
|
||||||
STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key);
|
STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key);
|
||||||
/**
|
|
||||||
* return the scan info, in the form of tuple of two items, including table uid and current timestamp
|
|
||||||
* @param tinfo
|
|
||||||
* @param uid
|
|
||||||
* @param ts
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
|
|
||||||
|
|
||||||
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
|
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
|
@ -225,16 +217,16 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
|
||||||
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
|
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
|
||||||
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
|
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
|
||||||
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
|
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
|
||||||
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
|
|
||||||
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
|
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
|
||||||
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
|
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
|
||||||
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
|
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
|
||||||
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo);
|
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo);
|
||||||
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo);
|
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo);
|
||||||
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo);
|
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo);
|
||||||
void qStreamCloseTsdbReader(void* task);
|
|
||||||
void resetTaskInfo(qTaskInfo_t tinfo);
|
void resetTaskInfo(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
|
void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
|
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
|
||||||
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);
|
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);
|
||||||
|
|
||||||
|
|
|
@ -267,7 +267,7 @@ typedef struct SCheckpointInfo {
|
||||||
|
|
||||||
typedef struct SStreamStatus {
|
typedef struct SStreamStatus {
|
||||||
int8_t taskStatus;
|
int8_t taskStatus;
|
||||||
int8_t checkDownstream; // downstream tasks are all ready now, if this flag is set
|
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
|
||||||
int8_t schedStatus;
|
int8_t schedStatus;
|
||||||
int8_t keepTaskStatus;
|
int8_t keepTaskStatus;
|
||||||
bool transferState;
|
bool transferState;
|
||||||
|
|
|
@ -817,10 +817,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
SStreamTask task = {0};
|
SStreamTask task = {0};
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
task.id = pTask->streamTaskId;
|
task.id = pTask->streamTaskId;
|
||||||
SStreamMeta meta = {0};
|
|
||||||
task.pMeta = pTask->pMeta;
|
task.pMeta = pTask->pMeta;
|
||||||
pSateTask = &task;
|
pSateTask = &task;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
|
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
|
||||||
if (pTask->pState == NULL) {
|
if (pTask->pState == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -840,7 +840,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
SStreamTask task = {0};
|
SStreamTask task = {0};
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
task.id = pTask->streamTaskId;
|
task.id = pTask->streamTaskId;
|
||||||
SStreamMeta meta = {0};
|
|
||||||
task.pMeta = pTask->pMeta;
|
task.pMeta = pTask->pMeta;
|
||||||
pSateTask = &task;
|
pSateTask = &task;
|
||||||
}
|
}
|
||||||
|
@ -1100,6 +1099,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
SVersionRange* pRange = NULL;
|
SVersionRange* pRange = NULL;
|
||||||
SStreamTask* pStreamTask = NULL;
|
SStreamTask* pStreamTask = NULL;
|
||||||
|
|
||||||
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
|
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
|
||||||
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
|
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
|
||||||
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
|
||||||
|
@ -1110,7 +1110,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
|
||||||
// wait for the stream task get ready for scan history data
|
// wait for the stream task get ready for scan history data
|
||||||
while (((pStreamTask->status.checkDownstream == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
|
while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) ||
|
||||||
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||||
tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr,
|
tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr,
|
||||||
pStreamTask->info.taskLevel);
|
pStreamTask->info.taskLevel);
|
||||||
|
@ -1136,8 +1136,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pRange->maxVer = ver;
|
pRange->maxVer = ver;
|
||||||
if (pRange->minVer == pRange->maxVer) {
|
if (pRange->minVer == pRange->maxVer) {
|
||||||
streamTaskRecoverSetAllStepFinished(pTask);
|
streamTaskRecoverSetAllStepFinished(pTask);
|
||||||
tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest",
|
tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", pId);
|
||||||
pId);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1147,31 +1146,34 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr);
|
pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr);
|
||||||
|
|
||||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||||
st = taosGetTimestampMs();
|
|
||||||
|
|
||||||
|
st = taosGetTimestampMs();
|
||||||
streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window);
|
streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!streamTaskRecoverScanStep2Finished(pTask)) {
|
if(!streamTaskRecoverScanStep2Finished(pTask)) {
|
||||||
|
|
||||||
streamSourceScanHistoryData(pTask);
|
streamSourceScanHistoryData(pTask);
|
||||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
|
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) {
|
||||||
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
|
tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskRecoverSetAllStepFinished(pTask);
|
streamTaskRecoverSetAllStepFinished(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
el = (taosGetTimestampMs() - st) / 1000.0;
|
el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el);
|
tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el);
|
||||||
|
|
||||||
|
// 3. notify the downstream tasks to transfer executor state after handle all history blocks.
|
||||||
if (!pTask->status.transferState) {
|
if (!pTask->status.transferState) {
|
||||||
// 3. notify the downstream tasks to transfer executor state after handle all history blocks.
|
|
||||||
pTask->status.transferState = true;
|
|
||||||
code = streamDispatchTransferStateMsg(pTask);
|
code = streamDispatchTransferStateMsg(pTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// todo handle error
|
// todo handle error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTask->status.transferState = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
|
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
|
||||||
|
@ -1179,12 +1181,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
streamTryExec(pTask);
|
streamTryExec(pTask);
|
||||||
|
|
||||||
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||||
|
|
||||||
tqDebug("s-task:%s set status to be dropping", pId);
|
tqDebug("s-task:%s set status to be dropping", pId);
|
||||||
|
|
||||||
// transfer the ownership of executor state
|
// transfer the ownership of executor state
|
||||||
streamTaskReleaseState(pTask);
|
streamTaskReleaseState(pTask);
|
||||||
streamTaskReloadState(pStreamTask);
|
streamTaskReloadState(pStreamTask);
|
||||||
|
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
streamMetaSaveTask(pMeta, pStreamTask);
|
streamMetaSaveTask(pMeta, pStreamTask);
|
||||||
|
|
||||||
|
@ -1405,6 +1407,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
|
// even in halt status, the data in inputQ must be processed
|
||||||
int8_t status = pTask->status.taskStatus;
|
int8_t status = pTask->status.taskStatus;
|
||||||
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) {
|
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) {
|
||||||
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
|
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
|
||||||
|
|
|
@ -271,9 +271,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
noDataInWal = false;
|
|
||||||
|
|
||||||
if (pItem != NULL) {
|
if (pItem != NULL) {
|
||||||
|
noDataInWal = false;
|
||||||
code = tAppendDataToInputQueue(pTask, pItem);
|
code = tAppendDataToInputQueue(pTask, pItem);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||||
|
|
|
@ -68,9 +68,6 @@ typedef struct {
|
||||||
SQueryTableDataCond tableCond;
|
SQueryTableDataCond tableCond;
|
||||||
SVersionRange fillHistoryVer;
|
SVersionRange fillHistoryVer;
|
||||||
STimeWindow fillHistoryWindow;
|
STimeWindow fillHistoryWindow;
|
||||||
// int64_t fillHistoryVer1;
|
|
||||||
// int64_t fillHisotryeKey1;
|
|
||||||
int64_t fillHistoryVer2;
|
|
||||||
SStreamState* pState;
|
SStreamState* pState;
|
||||||
int64_t dataVersion;
|
int64_t dataVersion;
|
||||||
int64_t checkPointId;
|
int64_t checkPointId;
|
||||||
|
|
|
@ -116,6 +116,16 @@ void resetTaskInfo(qTaskInfo_t tinfo) {
|
||||||
clearStreamBlock(pTaskInfo->pRoot);
|
clearStreamBlock(pTaskInfo->pRoot);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
|
||||||
|
if (pTaskInfo == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("%s set fill history start key:%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN);
|
||||||
|
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
|
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
if (pOperator->numOfDownstream == 0) {
|
if (pOperator->numOfDownstream == 0) {
|
||||||
|
@ -265,6 +275,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo);
|
pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo);
|
||||||
if (NULL == pTaskInfo->pRoot) {
|
if (NULL == pTaskInfo->pRoot) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -314,8 +325,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SSubplan* pPlan = NULL;
|
SSubplan* pPlan = NULL;
|
||||||
int32_t code = qStringToSubplan(msg, &pPlan);
|
int32_t code = qStringToSubplan(msg, &pPlan);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -907,14 +918,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver) {
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
|
||||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
|
||||||
pTaskInfo->streamInfo.fillHistoryVer2 = ver;
|
|
||||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE2;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
|
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
||||||
|
@ -1056,6 +1059,9 @@ int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
pTaskInfo->streamInfo.recoverStep1Finished = true;
|
pTaskInfo->streamInfo.recoverStep1Finished = true;
|
||||||
pTaskInfo->streamInfo.recoverStep2Finished = true;
|
pTaskInfo->streamInfo.recoverStep2Finished = true;
|
||||||
|
|
||||||
|
// reset the time window
|
||||||
|
pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1051,44 +1051,6 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void qStreamCloseTsdbReader(void* task) {
|
|
||||||
if (task == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
|
|
||||||
SOperatorInfo* pOp = pTaskInfo->pRoot;
|
|
||||||
|
|
||||||
qDebug("stream close tsdb reader, reset status uid:%" PRId64 " ts:%" PRId64, pTaskInfo->streamInfo.currentOffset.uid,
|
|
||||||
pTaskInfo->streamInfo.currentOffset.ts);
|
|
||||||
|
|
||||||
// todo refactor, other thread may already use this read to extract data.
|
|
||||||
pTaskInfo->streamInfo.currentOffset = (STqOffsetVal){0};
|
|
||||||
while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
|
|
||||||
SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
|
|
||||||
if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
|
||||||
SStreamScanInfo* pInfo = pDownstreamOp->info;
|
|
||||||
if (pInfo->pTableScanOp) {
|
|
||||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
|
||||||
|
|
||||||
setOperatorCompleted(pInfo->pTableScanOp);
|
|
||||||
while (pTaskInfo->owner != 0) {
|
|
||||||
taosMsleep(100);
|
|
||||||
qDebug("wait for the reader stopping");
|
|
||||||
}
|
|
||||||
|
|
||||||
pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
|
|
||||||
pTSInfo->base.dataReader = NULL;
|
|
||||||
|
|
||||||
// restore the status, todo refactor.
|
|
||||||
pInfo->pTableScanOp->status = OP_OPENED;
|
|
||||||
pTaskInfo->status = TASK_NOT_COMPLETED;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamOpReleaseState(SOperatorInfo* pOperator) {
|
void streamOpReleaseState(SOperatorInfo* pOperator) {
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (downstream->fpSet.releaseStreamStateFn) {
|
if (downstream->fpSet.releaseStreamStateFn) {
|
||||||
|
|
|
@ -1800,8 +1800,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id);
|
pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id);
|
||||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
|
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
|
||||||
} else {
|
} else {
|
||||||
pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer.minVer + 1;
|
pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer.minVer;
|
||||||
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
|
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer.maxVer;
|
||||||
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion,
|
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion,
|
||||||
pTSInfo->base.cond.endVersion, id);
|
pTSInfo->base.cond.endVersion, id);
|
||||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
|
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
|
||||||
|
@ -1873,7 +1873,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
|
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||||
} else {
|
} else {
|
||||||
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pTaskInfo->streamInfo.fillHistoryVer2);
|
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pTaskInfo->streamInfo.fillHistoryVer.maxVer);
|
||||||
doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
|
doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2100,6 +2100,8 @@ FETCH_NEXT_BLOCK:
|
||||||
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
|
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
|
||||||
|
|
||||||
if (pWindow->skey != INT64_MIN) {
|
if (pWindow->skey != INT64_MIN) {
|
||||||
|
qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey);
|
||||||
|
|
||||||
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
|
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
|
||||||
bool hasUnqualified = false;
|
bool hasUnqualified = false;
|
||||||
|
|
||||||
|
@ -2124,8 +2126,8 @@ FETCH_NEXT_BLOCK:
|
||||||
pBlock->info.dataLoad = 1;
|
pBlock->info.dataLoad = 1;
|
||||||
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
|
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
|
||||||
|
|
||||||
qDebug("%" PRId64 " rows in datablock, update res:%" PRId64 " %s", pBlockInfo->rows,
|
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows,
|
||||||
pInfo->pUpdateDataRes->info.rows, id);
|
pInfo->pUpdateDataRes->info.rows);
|
||||||
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
|
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,9 +121,10 @@ int32_t streamSchedExec(SStreamTask* pTask) {
|
||||||
pRunReq->streamId = pTask->id.streamId;
|
pRunReq->streamId = pTask->id.streamId;
|
||||||
pRunReq->taskId = pTask->id.taskId;
|
pRunReq->taskId = pTask->id.taskId;
|
||||||
|
|
||||||
|
qDebug("trigger to run s-task:%s", pTask->id.idStr);
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||||
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
|
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
|
||||||
qDebug("trigger to run s-task:%s", pTask->id.idStr);
|
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
|
qDebug("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
|
||||||
}
|
}
|
||||||
|
|
|
@ -332,7 +332,7 @@ int32_t updateCheckPointInfo(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void waitForTaskTobeIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
||||||
// wait for the stream task to be idle
|
// wait for the stream task to be idle
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
|
@ -367,12 +367,12 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for the stream task to be idle
|
// wait for the stream task to be idle
|
||||||
waitForTaskTobeIdle(pTask, pStreamTask);
|
waitForTaskIdle(pTask, pStreamTask);
|
||||||
|
|
||||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
// update the scan data range for source task.
|
// update the scan data range for source task.
|
||||||
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " transfer to %" PRId64 " - %" PRId64
|
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
|
||||||
", status:%s, sched-status:%d",
|
", status:%s, sched-status:%d",
|
||||||
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
||||||
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
|
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
|
||||||
|
|
||||||
|
@ -388,6 +388,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
// expand the query time window for stream scanner
|
// expand the query time window for stream scanner
|
||||||
pTimeWindow->skey = INT64_MIN;
|
pTimeWindow->skey = INT64_MIN;
|
||||||
|
qResetStreamInfoTimeWindow(pStreamTask->exec.pExecutor);
|
||||||
|
|
||||||
streamSetStatusNormal(pStreamTask);
|
streamSetStatusNormal(pStreamTask);
|
||||||
streamSchedExec(pStreamTask);
|
streamSchedExec(pStreamTask);
|
||||||
|
|
|
@ -438,7 +438,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pTask->status.checkDownstream == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
|
|
|
@ -97,16 +97,16 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
|
||||||
|
|
||||||
// serialize
|
// serialize
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
|
|
||||||
"-%" PRId64,
|
|
||||||
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
|
|
||||||
pWindow->skey, pWindow->ekey);
|
|
||||||
|
|
||||||
req.reqId = tGenIdPI64();
|
req.reqId = tGenIdPI64();
|
||||||
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
|
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
|
||||||
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||||
pTask->checkReqId = req.reqId;
|
pTask->checkReqId = req.reqId;
|
||||||
|
|
||||||
|
qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64
|
||||||
|
"-%" PRId64 ", req:0x%" PRIx64,
|
||||||
|
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer,
|
||||||
|
pWindow->skey, pWindow->ekey, req.reqId);
|
||||||
|
|
||||||
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
@ -129,7 +129,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
|
||||||
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pTask->status.checkDownstream = 1;
|
pTask->status.downstreamReady = 1;
|
||||||
qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s",
|
qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s",
|
||||||
pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus));
|
pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
|
|
||||||
|
@ -222,8 +222,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the downstream tasks have been checked flag
|
// set the downstream tasks have been checked flag
|
||||||
ASSERT(pTask->status.checkDownstream == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
pTask->status.checkDownstream = 1;
|
pTask->status.downstreamReady = 1;
|
||||||
|
|
||||||
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY || pTask->status.taskStatus == TASK_STATUS__NORMAL);
|
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY || pTask->status.taskStatus == TASK_STATUS__NORMAL);
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||||
|
@ -286,23 +286,6 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) {
|
||||||
return streamScanExec(pTask, 100);
|
return streamScanExec(pTask, 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
|
|
||||||
void* exec = pTask->exec.pExecutor;
|
|
||||||
const char* id = pTask->id.idStr;
|
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
|
||||||
qDebug("s-task:%s recover step2(blocking stage) started", id);
|
|
||||||
if (qStreamSourceRecoverStep2(exec, ver) < 0) {
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = streamScanExec(pTask, 100);
|
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
|
||||||
qDebug("s-task:%s recover step2(blocking stage) ended, elapsed time:%.2fs", id, el);
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
||||||
SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
|
SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
|
||||||
|
|
||||||
|
@ -373,8 +356,8 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
|
||||||
|
|
||||||
// serialize
|
// serialize
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
qDebug("s-task:%s send transfer state msg to downstream (fix-dispatch) to taskId:0x%x, status:%d", pTask->id.idStr,
|
qDebug("s-task:%s send transfer state msg to downstream (fix-dispatch) to taskId:0x%x, status:%s", pTask->id.idStr,
|
||||||
pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus);
|
pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
|
|
||||||
req.taskId = pTask->fixedEpDispatcher.taskId;
|
req.taskId = pTask->fixedEpDispatcher.taskId;
|
||||||
doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||||
|
@ -693,7 +676,7 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
|
||||||
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
|
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pTask->status.checkDownstream == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
|
||||||
// check downstream tasks for itself
|
// check downstream tasks for itself
|
||||||
streamTaskCheckDownstreamTasks(pTask);
|
streamTaskCheckDownstreamTasks(pTask);
|
||||||
|
|
Loading…
Reference in New Issue