Merge pull request #14606 from taosdata/feature/3_liaohj

fix(query): set results for local query in async APIs.
This commit is contained in:
Haojun Liao 2022-07-06 18:06:48 +08:00 committed by GitHub
commit 65f53993c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 21 additions and 17 deletions

View File

@ -13,6 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_COMMAND_H
#define TDENGINE_COMMAND_H
#include "cmdnodes.h" #include "cmdnodes.h"
#include "tmsg.h" #include "tmsg.h"
#include "plannodes.h" #include "plannodes.h"
@ -27,4 +30,4 @@ int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp);
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp); int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp);
void qExplainFreeCtx(SExplainCtx *pCtx); void qExplainFreeCtx(SExplainCtx *pCtx);
#endif

View File

@ -843,19 +843,25 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
pRequest->body.param = param; pRequest->body.param = param;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo; SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
if (taos_num_fields(pRequest) == 0) {
// this query has no results or error exists, return directly
if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows); pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return; return;
} }
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { // all data has returned to App already, no need to try again
// All data has returned to App already, no need to try again if ((pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) && pResultInfo->completed) {
if (pResultInfo->completed) {
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows); pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return; return;
} }
// it is a local executed query, no need to do async fetch
if (pResultInfo->current < pResultInfo->numOfRows && pRequest->body.queryJob == 0) {
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
} }
SSchedulerReq req = { SSchedulerReq req = {
@ -869,14 +875,14 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL); ASSERT(res != NULL && fp != NULL);
ASSERT(TD_RES_QUERY(res)); ASSERT(TD_RES_QUERY(res));
SRequestObj *pRequest = res; SRequestObj *pRequest = res;
pRequest->body.resInfo.convertUcs4 = false;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo; SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
// set the current block is all consumed // set the current block is all consumed
pResultInfo->current = pResultInfo->numOfRows; pResultInfo->current = pResultInfo->numOfRows;
pResultInfo->convertUcs4 = false;
taos_fetch_rows_a(res, fp, param); taos_fetch_rows_a(res, fp, param);
} }

View File

@ -565,7 +565,6 @@ static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** p
} }
int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) { int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {
int32_t numOfCols = LIST_LENGTH(pProjects);
blockDataEnsureCapacity(pBlock, 1); blockDataEnsureCapacity(pBlock, 1);
int32_t index = 0; int32_t index = 0;
@ -579,7 +578,6 @@ int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {
} }
pBlock->info.rows = 1; pBlock->info.rows = 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -13,13 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "catalog.h"
#include "command.h"
#include "query.h" #include "query.h"
#include "schInt.h" #include "schInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h"
SSchedulerMgmt schMgmt = { SSchedulerMgmt schMgmt = {
.jobRef = -1, .jobRef = -1,