enh: optimize drop when scan exec
This commit is contained in:
parent
1f9786886b
commit
66b9fa01f7
|
@ -1112,6 +1112,11 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// do recovery step 1
|
// do recovery step 1
|
||||||
streamSourceRecoverScanStep1(pTask);
|
streamSourceRecoverScanStep1(pTask);
|
||||||
|
|
||||||
|
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
|
||||||
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// build msg to launch next step
|
// build msg to launch next step
|
||||||
SStreamRecoverStep2Req req;
|
SStreamRecoverStep2Req req;
|
||||||
code = streamBuildSourceRecover2Req(pTask, &req);
|
code = streamBuildSourceRecover2Req(pTask, &req);
|
||||||
|
@ -1122,6 +1127,10 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
|
||||||
|
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// serialize msg
|
// serialize msg
|
||||||
int32_t len = sizeof(SStreamRecoverStep1Req);
|
int32_t len = sizeof(SStreamRecoverStep1Req);
|
||||||
|
|
||||||
|
@ -1160,6 +1169,11 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
|
||||||
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// restore param
|
// restore param
|
||||||
code = streamRestoreParam(pTask);
|
code = streamRestoreParam(pTask);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
|
|
|
@ -110,10 +110,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
||||||
|
|
||||||
int32_t batchCnt = 0;
|
int32_t batchCnt = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
SSDataBlock* output = NULL;
|
SSDataBlock* output = NULL;
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
if (qExecTask(exec, &output, &ts) < 0) {
|
if (qExecTask(exec, &output, &ts) < 0) {
|
||||||
ASSERT(0);
|
return -1;
|
||||||
}
|
}
|
||||||
if (output == NULL) {
|
if (output == NULL) {
|
||||||
if (qStreamRecoverScanFinished(exec)) {
|
if (qStreamRecoverScanFinished(exec)) {
|
||||||
|
|
|
@ -200,7 +200,6 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req*
|
||||||
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
|
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
|
||||||
void* exec = pTask->exec.executor;
|
void* exec = pTask->exec.executor;
|
||||||
if (qStreamSourceRecoverStep2(exec, ver) < 0) {
|
if (qStreamSourceRecoverStep2(exec, ver) < 0) {
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
return streamScanExec(pTask, 100);
|
return streamScanExec(pTask, 100);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue