diff --git a/include/client/taos.h b/include/client/taos.h index 5bb528bd7f..b4e5a41ccf 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -194,6 +194,7 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres); DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param); DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param); +DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param); // Shuduo: temporary enable for app build #if 1 diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 3a6f3badc0..6aa83e9575 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -171,6 +171,7 @@ typedef struct SReqResultInfo { uint32_t current; bool completed; int32_t precision; + bool convertUcs4; int32_t payloadLen; } SReqResultInfo; @@ -222,7 +223,7 @@ typedef struct SSyncQueryParam { SRequestObj* pRequest; } SSyncQueryParam; -void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); +void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void doSetOneRowPtr(SReqResultInfo* pResultInfo); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 75e1884360..171f06c257 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -191,6 +191,8 @@ void *createRequest(STscObj *pObj, int32_t type) { pRequest->requestId = generateRequestId(); pRequest->metric.start = taosGetTimestampUs(); + pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default + pRequest->type = type; pRequest->pTscObj = pObj; pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index e15624bc0d..9025121854 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1114,7 +1114,7 @@ static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) { tsem_post(&pParam->sem); } -void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { +void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { assert(pRequest != NULL); SReqResultInfo* pResultInfo = &pRequest->body.resInfo; @@ -1126,6 +1126,10 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc } SSyncQueryParam* pParam = pRequest->body.param; + + // convert ucs4 to native multi-bytes string + pResultInfo->convertUcs4 = convertUcs4; + taos_fetch_rows_a(pRequest, syncFetchFn, pParam); tsem_wait(&pParam->sem); } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 1a1925e244..5e442c4bf1 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -219,14 +219,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { if (TD_RES_QUERY(res)) { SRequestObj *pRequest = (SRequestObj *)res; +#if SYNC_ON_TOP_OF_ASYNC + return doAsyncFetchRows(pRequest, true, true); +#else if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { return NULL; } - -#if SYNC_ON_TOP_OF_ASYNC - return doAsyncFetchRow(pRequest, true, true); -#else return doFetchRows(pRequest, true, true); #endif @@ -489,6 +488,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { if (res == NULL) { return 0; } + if (TD_RES_QUERY(res)) { SRequestObj *pRequest = (SRequestObj *)res; @@ -501,7 +501,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { } #if SYNC_ON_TOP_OF_ASYNC - doAsyncFetchRow(pRequest, false, true); + doAsyncFetchRows(pRequest, false, true); #else doFetchRows(pRequest, true, true); #endif @@ -552,7 +552,11 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { return 0; } +#if SYNC_ON_TOP_OF_ASYNC + doAsyncFetchRows(pRequest, false, false); +#else doFetchRows(pRequest, false, false); +#endif SReqResultInfo *pResultInfo = &pRequest->body.resInfo; @@ -771,11 +775,11 @@ static void fetchCallback(void* pResult, void* param, int32_t code) { } if (pRequest->code != TSDB_CODE_SUCCESS) { - pRequest->code = code; pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); + return; } - pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResultInfo->pData, true, false); + pRequest->code = setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, false); if (pRequest->code != TSDB_CODE_SUCCESS) { pResultInfo->numOfRows = 0; pRequest->code = code; @@ -815,6 +819,13 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest); } +void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) { + ASSERT(res != NULL && fp != NULL); + SRequestObj *pRequest = res; + pRequest->body.resInfo.convertUcs4 = false; + taos_fetch_rows_a(res, fp, param); +} + TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { // TODO