Merge pull request #15379 from taosdata/fix/outofblockinsink

fix: fix out of block in data sink issue
This commit is contained in:
dapan1121 2022-07-25 15:03:27 +08:00 committed by GitHub
commit dd7cf53e96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 10 additions and 6 deletions

View File

@ -66,7 +66,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
int32_t code = 0; int32_t code = 0;
bool qcontinue = true; bool qcontinue = true;
SSDataBlock *pRes = NULL; SSDataBlock *pRes = NULL;
@ -104,8 +104,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
if (queryEnd) { if (queryStop) {
*queryEnd = true; *queryStop = true;
} }
break; break;
@ -125,6 +125,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue); QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue);
if (!qcontinue) { if (!qcontinue) {
if (queryStop) {
*queryStop = true;
}
break; break;
} }
@ -566,7 +570,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
SQWPhaseInput input = {0}; SQWPhaseInput input = {0};
void *rsp = NULL; void *rsp = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
bool queryEnd = false; bool queryStop = false;
do { do {
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
@ -576,7 +580,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
atomic_store_8((int8_t *)&ctx->queryInQueue, 0); atomic_store_8((int8_t *)&ctx->queryInQueue, 0);
atomic_store_8((int8_t *)&ctx->queryContinue, 0); atomic_store_8((int8_t *)&ctx->queryContinue, 0);
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
SOutputData sOutput = {0}; SOutputData sOutput = {0};
@ -627,7 +631,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { if (queryStop || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
// Note: query is not running anymore // Note: query is not running anymore
QW_SET_PHASE(ctx, 0); QW_SET_PHASE(ctx, 0);
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);