Merge pull request #19277 from taosdata/fix/qwInvalidMsgOrder
fix: invalid msg order issue
This commit is contained in:
commit
2a15e52bb9
|
@ -681,6 +681,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
void *rsp = NULL;
|
void *rsp = NULL;
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
bool queryStop = false;
|
bool queryStop = false;
|
||||||
|
bool qComplete = false;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
ctx = NULL;
|
ctx = NULL;
|
||||||
|
@ -705,11 +706,12 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rsp) {
|
if (rsp) {
|
||||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||||
|
|
||||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||||
if (qComplete) {
|
if (qComplete) {
|
||||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||||
|
atomic_store_8((int8_t *)&ctx->queryContinue, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
qwMsg->connInfo = ctx->dataConnInfo;
|
qwMsg->connInfo = ctx->dataConnInfo;
|
||||||
|
@ -743,8 +745,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||||
if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code ||
|
if (qComplete || (queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code) {
|
||||||
0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
|
|
||||||
// Note: query is not running anymore
|
// Note: query is not running anymore
|
||||||
QW_SET_PHASE(ctx, 0);
|
QW_SET_PHASE(ctx, 0);
|
||||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||||
|
|
Loading…
Reference in New Issue