refactor: do some internal refactor, and add a new api
This commit is contained in:
parent
ed3289e0fa
commit
840074dfa3
|
@ -194,6 +194,7 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres);
|
||||||
|
|
||||||
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
|
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
|
||||||
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
|
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
|
||||||
|
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param);
|
||||||
|
|
||||||
// Shuduo: temporary enable for app build
|
// Shuduo: temporary enable for app build
|
||||||
#if 1
|
#if 1
|
||||||
|
|
|
@ -171,6 +171,7 @@ typedef struct SReqResultInfo {
|
||||||
uint32_t current;
|
uint32_t current;
|
||||||
bool completed;
|
bool completed;
|
||||||
int32_t precision;
|
int32_t precision;
|
||||||
|
bool convertUcs4;
|
||||||
int32_t payloadLen;
|
int32_t payloadLen;
|
||||||
} SReqResultInfo;
|
} SReqResultInfo;
|
||||||
|
|
||||||
|
@ -222,7 +223,7 @@ typedef struct SSyncQueryParam {
|
||||||
SRequestObj* pRequest;
|
SRequestObj* pRequest;
|
||||||
} SSyncQueryParam;
|
} SSyncQueryParam;
|
||||||
|
|
||||||
void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
|
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
|
||||||
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
|
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
|
||||||
|
|
||||||
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
|
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
|
||||||
|
|
|
@ -191,6 +191,8 @@ void *createRequest(STscObj *pObj, int32_t type) {
|
||||||
pRequest->requestId = generateRequestId();
|
pRequest->requestId = generateRequestId();
|
||||||
pRequest->metric.start = taosGetTimestampUs();
|
pRequest->metric.start = taosGetTimestampUs();
|
||||||
|
|
||||||
|
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
|
||||||
|
|
||||||
pRequest->type = type;
|
pRequest->type = type;
|
||||||
pRequest->pTscObj = pObj;
|
pRequest->pTscObj = pObj;
|
||||||
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
|
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
|
||||||
|
|
|
@ -1114,7 +1114,7 @@ static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
|
||||||
tsem_post(&pParam->sem);
|
tsem_post(&pParam->sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
||||||
assert(pRequest != NULL);
|
assert(pRequest != NULL);
|
||||||
|
|
||||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||||
|
@ -1126,6 +1126,10 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncQueryParam* pParam = pRequest->body.param;
|
SSyncQueryParam* pParam = pRequest->body.param;
|
||||||
|
|
||||||
|
// convert ucs4 to native multi-bytes string
|
||||||
|
pResultInfo->convertUcs4 = convertUcs4;
|
||||||
|
|
||||||
taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
|
taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
|
||||||
tsem_wait(&pParam->sem);
|
tsem_wait(&pParam->sem);
|
||||||
}
|
}
|
||||||
|
|
|
@ -219,14 +219,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
|
|
||||||
if (TD_RES_QUERY(res)) {
|
if (TD_RES_QUERY(res)) {
|
||||||
SRequestObj *pRequest = (SRequestObj *)res;
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
|
#if SYNC_ON_TOP_OF_ASYNC
|
||||||
|
return doAsyncFetchRows(pRequest, true, true);
|
||||||
|
#else
|
||||||
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
|
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
|
||||||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
|
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if SYNC_ON_TOP_OF_ASYNC
|
|
||||||
return doAsyncFetchRow(pRequest, true, true);
|
|
||||||
#else
|
|
||||||
return doFetchRows(pRequest, true, true);
|
return doFetchRows(pRequest, true, true);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -489,6 +488,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
|
||||||
if (res == NULL) {
|
if (res == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TD_RES_QUERY(res)) {
|
if (TD_RES_QUERY(res)) {
|
||||||
SRequestObj *pRequest = (SRequestObj *)res;
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
|
|
||||||
|
@ -501,7 +501,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
|
||||||
}
|
}
|
||||||
|
|
||||||
#if SYNC_ON_TOP_OF_ASYNC
|
#if SYNC_ON_TOP_OF_ASYNC
|
||||||
doAsyncFetchRow(pRequest, false, true);
|
doAsyncFetchRows(pRequest, false, true);
|
||||||
#else
|
#else
|
||||||
doFetchRows(pRequest, true, true);
|
doFetchRows(pRequest, true, true);
|
||||||
#endif
|
#endif
|
||||||
|
@ -552,7 +552,11 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if SYNC_ON_TOP_OF_ASYNC
|
||||||
|
doAsyncFetchRows(pRequest, false, false);
|
||||||
|
#else
|
||||||
doFetchRows(pRequest, false, false);
|
doFetchRows(pRequest, false, false);
|
||||||
|
#endif
|
||||||
|
|
||||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||||
|
|
||||||
|
@ -771,11 +775,11 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
pRequest->code = code;
|
|
||||||
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
|
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResultInfo->pData, true, false);
|
pRequest->code = setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, false);
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
pResultInfo->numOfRows = 0;
|
pResultInfo->numOfRows = 0;
|
||||||
pRequest->code = code;
|
pRequest->code = code;
|
||||||
|
@ -815,6 +819,13 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||||
schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest);
|
schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) {
|
||||||
|
ASSERT(res != NULL && fp != NULL);
|
||||||
|
SRequestObj *pRequest = res;
|
||||||
|
pRequest->body.resInfo.convertUcs4 = false;
|
||||||
|
taos_fetch_rows_a(res, fp, param);
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp,
|
TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp,
|
||||||
void *param, int interval) {
|
void *param, int interval) {
|
||||||
// TODO
|
// TODO
|
||||||
|
|
Loading…
Reference in New Issue