[td-13039] support taos_fetch_block
This commit is contained in:
parent
f036052e74
commit
2a6262f9ad
|
@ -235,7 +235,7 @@ void initMsgHandleFp();
|
||||||
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||||
uint16_t port);
|
uint16_t port);
|
||||||
|
|
||||||
void* doFetchRow(SRequestObj* pRequest);
|
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr);
|
||||||
|
|
||||||
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
|
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
|
||||||
|
|
||||||
|
|
|
@ -545,7 +545,42 @@ TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, c
|
||||||
return taos_connect(ipStr, userStr, passStr, dbStr, port);
|
return taos_connect(ipStr, userStr, passStr, dbStr, port);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* doFetchRow(SRequestObj* pRequest) {
|
static void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
|
||||||
|
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
|
||||||
|
SResultColumn* pCol = &pResultInfo->pCol[i];
|
||||||
|
|
||||||
|
int32_t type = pResultInfo->fields[i].type;
|
||||||
|
int32_t bytes = pResultInfo->fields[i].bytes;
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
|
if (pCol->offset[pResultInfo->current] != -1) {
|
||||||
|
char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
|
||||||
|
|
||||||
|
pResultInfo->length[i] = varDataLen(pStart);
|
||||||
|
pResultInfo->row[i] = varDataVal(pStart);
|
||||||
|
|
||||||
|
if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(pResultInfo->convertBuf[i]));
|
||||||
|
ASSERT(len <= bytes);
|
||||||
|
|
||||||
|
pResultInfo->row[i] = varDataVal(pResultInfo->convertBuf[i]);
|
||||||
|
varDataSetLen(pResultInfo->convertBuf[i], len);
|
||||||
|
pResultInfo->length[i] = len;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pResultInfo->row[i] = NULL;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
|
||||||
|
pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
|
||||||
|
} else {
|
||||||
|
pResultInfo->row[i] = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr) {
|
||||||
assert(pRequest != NULL);
|
assert(pRequest != NULL);
|
||||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||||
|
|
||||||
|
@ -555,17 +590,20 @@ void* doFetchRow(SRequestObj* pRequest) {
|
||||||
if (pRequest->type == TDMT_VND_QUERY) {
|
if (pRequest->type == TDMT_VND_QUERY) {
|
||||||
// All data has returned to App already, no need to try again
|
// All data has returned to App already, no need to try again
|
||||||
if (pResultInfo->completed) {
|
if (pResultInfo->completed) {
|
||||||
|
pResultInfo->numOfRows = 0;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
||||||
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
|
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
|
pResultInfo->numOfRows = 0;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
|
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
|
pResultInfo->numOfRows = 0;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -633,41 +671,11 @@ void* doFetchRow(SRequestObj* pRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
if (setupOneRowPtr) {
|
||||||
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
|
doSetOneRowPtr(pResultInfo);
|
||||||
SResultColumn* pCol = &pResultInfo->pCol[i];
|
|
||||||
|
|
||||||
int32_t type = pResultInfo->fields[i].type;
|
|
||||||
int32_t bytes = pResultInfo->fields[i].bytes;
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
|
||||||
if (pCol->offset[pResultInfo->current] != -1) {
|
|
||||||
char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
|
|
||||||
|
|
||||||
pResultInfo->length[i] = varDataLen(pStart);
|
|
||||||
pResultInfo->row[i] = varDataVal(pStart);
|
|
||||||
|
|
||||||
if (type == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(pResultInfo->convertBuf[i]));
|
|
||||||
ASSERT(len <= bytes);
|
|
||||||
|
|
||||||
pResultInfo->row[i] = varDataVal(pResultInfo->convertBuf[i]);
|
|
||||||
varDataSetLen(pResultInfo->convertBuf[i], len);
|
|
||||||
pResultInfo->length[i] = len;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
pResultInfo->row[i] = NULL;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
|
|
||||||
pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
|
|
||||||
} else {
|
|
||||||
pResultInfo->row[i] = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pResultInfo->current += 1;
|
pResultInfo->current += 1;
|
||||||
|
}
|
||||||
|
|
||||||
return pResultInfo->row;
|
return pResultInfo->row;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -138,20 +138,20 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
|
||||||
return taos_query_l(taos, sql, (int32_t) strlen(sql));
|
return taos_query_l(taos, sql, (int32_t) strlen(sql));
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_ROW taos_fetch_row(TAOS_RES *pRes) {
|
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
if (pRes == NULL) {
|
if (res == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestObj *pRequest = (SRequestObj *) pRes;
|
SRequestObj *pRequest = (SRequestObj *) res;
|
||||||
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
||||||
pRequest->type == TSDB_SQL_INSERT ||
|
pRequest->type == TSDB_SQL_INSERT ||
|
||||||
pRequest->code != TSDB_CODE_SUCCESS ||
|
pRequest->code != TSDB_CODE_SUCCESS ||
|
||||||
taos_num_fields(pRes) == 0) {
|
taos_num_fields(res) == 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return doFetchRow(pRequest);
|
return doFetchRow(pRequest, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
|
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
|
||||||
|
@ -330,9 +330,28 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
|
||||||
|
if (res == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SRequestObj *pRequest = (SRequestObj *) res;
|
||||||
|
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
||||||
|
pRequest->type == TSDB_SQL_INSERT ||
|
||||||
|
pRequest->code != TSDB_CODE_SUCCESS ||
|
||||||
|
taos_num_fields(res) == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
doFetchRow(pRequest, false);
|
||||||
|
|
||||||
|
// TODO refactor
|
||||||
|
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||||
|
pResultInfo->current = pResultInfo->numOfRows;
|
||||||
|
*rows = pResultInfo->row;
|
||||||
|
|
||||||
|
return pResultInfo->numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
int taos_validate_sql(TAOS *taos, const char *sql) {
|
int taos_validate_sql(TAOS *taos, const char *sql) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue