fix(decimal): fix decimal taos_fetch_fields_e api

This commit is contained in:
wangjiaming0909 2025-03-11 15:28:52 +08:00
parent bb87a5b0b7
commit ef613fb702
12 changed files with 55 additions and 47 deletions

View File

@ -423,8 +423,8 @@ uint8_t getScaleFromTypeMod(int32_t type, STypeMod mod);
void fillBytesForDecimalType(int32_t *pBytes, int32_t type, uint8_t precision, uint8_t scale);
void extractDecimalTypeInfoFromBytes(int32_t *pBytes, uint8_t *precision, uint8_t *scale);
int32_t calcTypeBytesFromSchemaBytes(int32_t type, int32_t schemaBytes);
int32_t calcSchemaBytesFromTypeBytes(int32_t type, int32_t varTypeBytes);
int32_t calcTypeBytesFromSchemaBytes(int32_t type, int32_t schemaBytes, bool isStmt);
int32_t calcSchemaBytesFromTypeBytes(int32_t type, int32_t varTypeBytes, bool isStmt);
#ifdef __cplusplus
}

View File

@ -312,11 +312,11 @@ typedef struct SSyncQueryParam {
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void doSetOneRowPtr(SReqResultInfo* pResultInfo, bool isStmt);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4);
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols, const SExtSchema* pExtSchema);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4, bool isStmt);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt);
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols, const SExtSchema* pExtSchema, bool isStmt);
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);

View File

@ -315,7 +315,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
code = qParseSql(&cxt, pQuery);
if (TSDB_CODE_SUCCESS == code) {
if ((*pQuery)->haveResultSet) {
code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols, (*pQuery)->pResExtSchema);
code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols, (*pQuery)->pResExtSchema, pRequest->isStmtBind);
setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
}
}
@ -337,7 +337,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
int8_t biMode = atomic_load_8(&pRequest->pTscObj->biMode);
int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode, pRequest->pTscObj->optionInfo.charsetCxt);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4);
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4, pRequest->isStmtBind);
}
return code;
@ -375,7 +375,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4);
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4, pRequest->isStmtBind);
}
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
@ -517,7 +517,7 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
return qCreateQueryPlan(&cxt, pPlan, pNodeList);
}
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols, const SExtSchema* pExtSchema) {
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols, const SExtSchema* pExtSchema, bool isStmt) {
if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
return TSDB_CODE_INVALID_PARA;
@ -546,8 +546,9 @@ int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32
pResInfo->fields[i].type = pSchema[i].type;
pResInfo->userFields[i].type = pSchema[i].type;
pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes);
pResInfo->fields[i].bytes = pResInfo->userFields[i].bytes;
// userFields must convert to type bytes, no matter isStmt or not
pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
}
@ -1940,12 +1941,12 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
// return taos_connect(ipStr, userStr, passStr, dbStr, port);
// }
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
void doSetOneRowPtr(SReqResultInfo* pResultInfo, bool isStmt) {
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
SResultColumn* pCol = &pResultInfo->pCol[i];
int32_t type = pResultInfo->fields[i].type;
int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->fields[i].bytes);
int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->fields[i].bytes, isStmt);
if (IS_VAR_DATA_TYPE(type)) {
if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
@ -1992,7 +1993,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
}
pRequest->code =
setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData, convertUcs4, pRequest->isStmtBind);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
return NULL;
@ -2011,7 +2012,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
}
if (setupOneRowPtr) {
doSetOneRowPtr(pResultInfo);
doSetOneRowPtr(pResultInfo, pRequest->isStmtBind);
pResultInfo->current += 1;
}
@ -2058,7 +2059,7 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
return NULL;
} else {
if (setupOneRowPtr) {
doSetOneRowPtr(pResultInfo);
doSetOneRowPtr(pResultInfo, pRequest->isStmtBind);
pResultInfo->current += 1;
}
@ -2085,14 +2086,14 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
return TSDB_CODE_SUCCESS;
}
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength) {
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
int32_t idx = -1;
iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
int32_t type = pResultInfo->fields[i].type;
int32_t schemaBytes = calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes);
int32_t schemaBytes = calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
@ -2393,7 +2394,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
return TSDB_CODE_SUCCESS;
}
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4) {
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
tscError("setResultDataPtr paras error");
return TSDB_CODE_TSC_INTERNAL_ERROR;
@ -2486,7 +2487,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4) {
}
pResultInfo->pCol[i].pData = pStart;
pResultInfo->length[i] = calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes);
pResultInfo->length[i] = calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
pResultInfo->row[i] = pResultInfo->pCol[i].pData;
pStart += colLength[i];
@ -2502,7 +2503,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4) {
}
if (convertUcs4) {
code = doConvertUCS4(pResultInfo, colLength);
code = doConvertUCS4(pResultInfo, colLength, isStmt);
}
if (TSDB_CODE_SUCCESS == code && convertUcs4) {
code = convertDecimalType(pResultInfo);
@ -2547,7 +2548,7 @@ void resetConnectDB(STscObj* pTscObj) {
(void)taosThreadMutexUnlock(&pTscObj->mutex);
}
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4, bool isStmt) {
if (pResultInfo == NULL || pRsp == NULL) {
tscError("setQueryResultFromRsp paras is null");
return TSDB_CODE_TSC_INTERNAL_ERROR;
@ -2616,7 +2617,7 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
// TODO handle the compressed case
pResultInfo->totalRows += pResultInfo->numOfRows;
int32_t code = setResultDataPtr(pResultInfo, convertUcs4);
int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
return code;
}
@ -3079,7 +3080,7 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
}
pRequest->code =
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4);
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, pRequest->isStmtBind);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(pRequest->code),

View File

@ -638,7 +638,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
}
if (pResultInfo->current < pResultInfo->numOfRows) {
doSetOneRowPtr(pResultInfo);
doSetOneRowPtr(pResultInfo, false);
pResultInfo->current += 1;
return pResultInfo->row;
} else {
@ -646,7 +646,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return NULL;
}
doSetOneRowPtr(pResultInfo);
doSetOneRowPtr(pResultInfo, false);
pResultInfo->current += 1;
return pResultInfo->row;
}
@ -1276,7 +1276,7 @@ void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta
}
if (pQuery->haveResultSet) {
code = setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols, pQuery->pResExtSchema);
code = setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols, pQuery->pResExtSchema, pRequest->isStmtBind);
setResPrecision(&pRequest->body.resInfo, pQuery->precision);
}
}

View File

@ -680,7 +680,7 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
code = buildShowVariablesRsp(rsp.variables, &pRes);
}
if (TSDB_CODE_SUCCESS == code) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false);
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->isStmtBind);
}
if (code != 0) {
@ -835,7 +835,7 @@ int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
}
if (TSDB_CODE_SUCCESS == code) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false);
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->isStmtBind);
}
if (code != 0) {

View File

@ -1270,7 +1270,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
if (pStmt->sql.pQuery->haveResultSet) {
STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema));
pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true));
taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
@ -1862,7 +1862,7 @@ int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
}
*type = pField[idx].type;
*bytes = calcSchemaBytesFromTypeBytes(pField[idx].type, pField[idx].bytes);
*bytes = calcSchemaBytesFromTypeBytes(pField[idx].type, pField[idx].bytes, true);
_return:

View File

@ -1385,7 +1385,7 @@ int stmtBindBatch2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* bind, int32_t colIdx) {
if (pStmt->sql.pQuery->haveResultSet) {
STMT_ERR_RET(setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema));
pStmt->sql.pQuery->numOfResCols, pStmt->sql.pQuery->pResExtSchema, true));
taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
taosMemoryFreeClear(pStmt->sql.pQuery->pResExtSchema);
setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);

View File

@ -3012,7 +3012,7 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes
doFreeReqResultInfo(&pRspObj->resInfo);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter);
if (pSW) {
TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL));
TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL, false));
}
}
@ -3028,7 +3028,7 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes
pRspObj->resInfo.precision = precision;
pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;
int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4);
int32_t code = setResultDataPtr(&pRspObj->resInfo, convertUcs4, false);
if (code != 0) {
return code;
}

View File

@ -293,7 +293,8 @@ void extractDecimalTypeInfoFromBytes(int32_t *pBytes, uint8_t *precision, uint8_
*pBytes >>= 24;
}
int32_t calcTypeBytesFromSchemaBytes(int32_t type, int32_t schemaBytes) {
int32_t calcTypeBytesFromSchemaBytes(int32_t type, int32_t schemaBytes, bool isStmt) {
if (isStmt) return schemaBytes;
if (type == TSDB_DATA_TYPE_VARCHAR || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_GEOMETRY) {
return schemaBytes - VARSTR_HEADER_SIZE;
} else if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_JSON) {
@ -302,7 +303,8 @@ int32_t calcTypeBytesFromSchemaBytes(int32_t type, int32_t schemaBytes) {
return schemaBytes;
}
int32_t calcSchemaBytesFromTypeBytes(int32_t type, int32_t varTypeBytes) {
int32_t calcSchemaBytesFromTypeBytes(int32_t type, int32_t varTypeBytes, bool isStmt) {
if (isStmt) return varTypeBytes;
if (type == TSDB_DATA_TYPE_VARCHAR || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_GEOMETRY) {
return varTypeBytes + VARSTR_HEADER_SIZE;
} else if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_JSON) {

View File

@ -293,8 +293,15 @@ typedef struct {
int8_t dirty;
struct {
int16_t cid;
int8_t type;
int8_t flag;
SValue value;
union {
int64_t val;
struct {
uint32_t nData;
uint8_t *pData;
};
} value;
} colVal;
} SLastColV0;
@ -306,7 +313,7 @@ static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
pLastCol->dirty = pLastColV0->dirty;
pLastCol->colVal.cid = pLastColV0->colVal.cid;
pLastCol->colVal.flag = pLastColV0->colVal.flag;
pLastCol->colVal.value.type = pLastColV0->colVal.value.type;
pLastCol->colVal.value.type = pLastColV0->colVal.type;
pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
@ -406,7 +413,7 @@ static int32_t tsdbCacheSerializeV0(char const *value, SLastCol *pLastCol) {
pLastColV0->dirty = pLastCol->dirty;
pLastColV0->colVal.cid = pLastCol->colVal.cid;
pLastColV0->colVal.flag = pLastCol->colVal.flag;
pLastColV0->colVal.value.type = pLastCol->colVal.value.type;
pLastColV0->colVal.type = pLastCol->colVal.value.type;
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
pLastColV0->colVal.value.nData = pLastCol->colVal.value.nData;
if (pLastCol->colVal.value.nData > 0) {

View File

@ -1052,7 +1052,7 @@ int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSc
schema = &pSchema[boundColumns[i]];
tstrncpy((*fields)[i].name, schema->name, 65);
(*fields)[i].type = schema->type;
(*fields)[i].bytes = calcTypeBytesFromSchemaBytes(schema->type, schema->bytes);
(*fields)[i].bytes = calcTypeBytesFromSchemaBytes(schema->type, schema->bytes, true);
}
}

View File

@ -960,9 +960,9 @@ class DecimalFunction(DecimalColumnExpr):
DecimalSumFunction(),
DecimalAvgFunction(),
DecimalCountFunction(),
#DecimalLastRowFunction(),
#DecimalLastFunction(),
#DecimalFirstFunction(),
DecimalLastRowFunction(),
DecimalLastFunction(),
DecimalFirstFunction(),
]
def check_results(self, query_col_res: List) -> bool:
@ -2296,9 +2296,7 @@ class TDTestCase:
self.norm_tb_columns,
DecimalFunction.get_decimal_agg_funcs,
)
self.test_decimal_agg_funcs(
self.db_name, self.stable_name, self.stb_columns, DecimalFunction.get_decimal_agg_funcs
)
##self.test_decimal_agg_funcs( self.db_name, self.stable_name, self.stb_columns, DecimalFunction.get_decimal_agg_funcs)
self.test_decimal_cast_func(self.db_name, self.norm_table_name, self.norm_tb_columns)
def test_query_decimal(self):