Merge pull request #11392 from taosdata/feature/3.0_liaohj
Feature/3.0 liaohj
This commit is contained in:
commit
a3be085135
|
@ -238,9 +238,9 @@ void initMsgHandleFp();
|
||||||
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||||
uint16_t port);
|
uint16_t port);
|
||||||
|
|
||||||
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr);
|
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
|
||||||
|
|
||||||
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
|
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4);
|
||||||
|
|
||||||
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
|
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
|
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
|
||||||
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
|
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
|
||||||
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
|
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
|
||||||
static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
|
static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
|
||||||
|
|
||||||
static bool stringLengthCheck(const char* str, size_t maxsize) {
|
static bool stringLengthCheck(const char* str, size_t maxsize) {
|
||||||
if (str == NULL) {
|
if (str == NULL) {
|
||||||
|
@ -176,7 +176,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
SRetrieveTableRsp* pRsp = NULL;
|
SRetrieveTableRsp* pRsp = NULL;
|
||||||
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
|
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
|
||||||
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp);
|
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -616,7 +616,7 @@ static void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr) {
|
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
||||||
assert(pRequest != NULL);
|
assert(pRequest != NULL);
|
||||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||||
|
|
||||||
|
@ -637,7 +637,7 @@ void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
|
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
pResultInfo->numOfRows = 0;
|
pResultInfo->numOfRows = 0;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -735,7 +735,42 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
|
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
int32_t type = pResultInfo->fields[i].type;
|
||||||
|
int32_t bytes = pResultInfo->fields[i].bytes;
|
||||||
|
|
||||||
|
if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
|
||||||
|
if (p == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
pResultInfo->convertBuf[i] = p;
|
||||||
|
|
||||||
|
SResultColumn* pCol = &pResultInfo->pCol[i];
|
||||||
|
for (int32_t j = 0; j < numOfRows; ++j) {
|
||||||
|
if (pCol->offset[j] != -1) {
|
||||||
|
char* pStart = pCol->offset[j] + pCol->pData;
|
||||||
|
|
||||||
|
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
|
||||||
|
ASSERT(len <= bytes);
|
||||||
|
|
||||||
|
varDataSetLen(p, len);
|
||||||
|
pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
|
||||||
|
p += (len + VARSTR_HEADER_SIZE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
|
||||||
|
pResultInfo->row[i] = pResultInfo->pCol[i].pData;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4) {
|
||||||
assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
|
assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
|
||||||
if (numOfRows == 0) {
|
if (numOfRows == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -767,37 +802,11 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
|
||||||
}
|
}
|
||||||
|
|
||||||
// convert UCS4-LE encoded character to native multi-bytes character in current data block.
|
// convert UCS4-LE encoded character to native multi-bytes character in current data block.
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
if (convertUcs4) {
|
||||||
int32_t type = pResultInfo->fields[i].type;
|
code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
|
||||||
int32_t bytes = pResultInfo->fields[i].bytes;
|
|
||||||
|
|
||||||
if (type == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
|
|
||||||
if (p == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
pResultInfo->convertBuf[i] = p;
|
|
||||||
|
|
||||||
SResultColumn* pCol = &pResultInfo->pCol[i];
|
|
||||||
for (int32_t j = 0; j < numOfRows; ++j) {
|
|
||||||
if (pCol->offset[j] != -1) {
|
|
||||||
pStart = pCol->offset[j] + pCol->pData;
|
|
||||||
|
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
|
|
||||||
ASSERT(len <= bytes);
|
|
||||||
|
|
||||||
varDataSetLen(p, len);
|
|
||||||
pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
|
|
||||||
p += (len + VARSTR_HEADER_SIZE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
|
|
||||||
pResultInfo->row[i] = pResultInfo->pCol[i].pData;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* getDbOfConnection(STscObj* pObj) {
|
char* getDbOfConnection(STscObj* pObj) {
|
||||||
|
@ -829,7 +838,7 @@ void resetConnectDB(STscObj* pTscObj) {
|
||||||
taosThreadMutexUnlock(&pTscObj->mutex);
|
taosThreadMutexUnlock(&pTscObj->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
|
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
|
||||||
assert(pResultInfo != NULL && pRsp != NULL);
|
assert(pResultInfo != NULL && pRsp != NULL);
|
||||||
|
|
||||||
pResultInfo->pRspMsg = (const char*)pRsp;
|
pResultInfo->pRspMsg = (const char*)pRsp;
|
||||||
|
@ -842,5 +851,5 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
|
||||||
|
|
||||||
// TODO handle the compressed case
|
// TODO handle the compressed case
|
||||||
pResultInfo->totalRows += pResultInfo->numOfRows;
|
pResultInfo->totalRows += pResultInfo->numOfRows;
|
||||||
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
|
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4);
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,7 +168,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return doFetchRow(pRequest, true);
|
return doFetchRow(pRequest, true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
|
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
|
||||||
|
@ -404,7 +404,7 @@ int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
doFetchRow(pRequest, false);
|
doFetchRow(pRequest, false, true);
|
||||||
|
|
||||||
// TODO refactor
|
// TODO refactor
|
||||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||||
|
@ -426,7 +426,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
doFetchRow(pRequest, false);
|
doFetchRow(pRequest, false, false);
|
||||||
|
|
||||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||||
|
|
||||||
|
|
|
@ -191,7 +191,7 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code)
|
||||||
pResInfo->completed = pRetrieve->completed;
|
pResInfo->completed = pRetrieve->completed;
|
||||||
|
|
||||||
pResInfo->current = 0;
|
pResInfo->current = 0;
|
||||||
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
|
// setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows,
|
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows,
|
||||||
pRetrieve->completed, pRequest->body.showInfo.execId);
|
pRetrieve->completed, pRequest->body.showInfo.execId);
|
||||||
|
@ -225,7 +225,7 @@ int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
pResInfo->pData = pFetchRsp->data;
|
pResInfo->pData = pFetchRsp->data;
|
||||||
|
|
||||||
pResInfo->current = 0;
|
pResInfo->current = 0;
|
||||||
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
|
// setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows,
|
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows,
|
||||||
pFetchRsp->completed, pRequest->body.showInfo.execId);
|
pFetchRsp->completed, pRequest->body.showInfo.execId);
|
||||||
|
|
|
@ -1748,7 +1748,6 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
|
||||||
SFilterField* fi = right;
|
SFilterField* fi = right;
|
||||||
|
|
||||||
SValueNode* var = (SValueNode *)fi->desc;
|
SValueNode* var = (SValueNode *)fi->desc;
|
||||||
|
|
||||||
if (var == NULL) {
|
if (var == NULL) {
|
||||||
assert(fi->data != NULL);
|
assert(fi->data != NULL);
|
||||||
continue;
|
continue;
|
||||||
|
@ -1767,13 +1766,18 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataType *dType = &var->node.resType;
|
SDataType *dType = &var->node.resType;
|
||||||
|
size_t bytes = 0;
|
||||||
|
|
||||||
if (type == TSDB_DATA_TYPE_BINARY) {
|
if (type == TSDB_DATA_TYPE_BINARY) {
|
||||||
size_t len = (dType->type == TSDB_DATA_TYPE_BINARY || dType->type == TSDB_DATA_TYPE_NCHAR) ? dType->bytes : MAX_NUM_STR_SIZE;
|
size_t len = (dType->type == TSDB_DATA_TYPE_BINARY || dType->type == TSDB_DATA_TYPE_NCHAR) ? dType->bytes : MAX_NUM_STR_SIZE;
|
||||||
fi->data = taosMemoryCalloc(1, len + 1 + VARSTR_HEADER_SIZE);
|
bytes = len + 1 + VARSTR_HEADER_SIZE;
|
||||||
|
|
||||||
|
fi->data = taosMemoryCalloc(1, bytes);
|
||||||
} else if (type == TSDB_DATA_TYPE_NCHAR) {
|
} else if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
size_t len = (dType->type == TSDB_DATA_TYPE_BINARY || dType->type == TSDB_DATA_TYPE_NCHAR) ? dType->bytes : MAX_NUM_STR_SIZE;
|
size_t len = (dType->type == TSDB_DATA_TYPE_BINARY || dType->type == TSDB_DATA_TYPE_NCHAR) ? dType->bytes : MAX_NUM_STR_SIZE;
|
||||||
fi->data = taosMemoryCalloc(1, (len + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
|
bytes = (len + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
|
||||||
|
|
||||||
|
fi->data = taosMemoryCalloc(1, bytes);
|
||||||
} else if (type != TSDB_DATA_TYPE_JSON){
|
} else if (type != TSDB_DATA_TYPE_JSON){
|
||||||
if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) { //TIME RANGE
|
if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) { //TIME RANGE
|
||||||
/*
|
/*
|
||||||
|
@ -1797,8 +1801,11 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
|
||||||
} else {
|
} else {
|
||||||
SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
|
SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
|
||||||
out.columnData->info.type = type;
|
out.columnData->info.type = type;
|
||||||
out.columnData->info.bytes = tDataTypes[type].bytes;
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
ASSERT(!IS_VAR_DATA_TYPE(type));
|
out.columnData->info.bytes = bytes;
|
||||||
|
} else {
|
||||||
|
out.columnData->info.bytes = tDataTypes[type].bytes;
|
||||||
|
}
|
||||||
|
|
||||||
// todo refactor the convert
|
// todo refactor the convert
|
||||||
int32_t code = doConvertDataType(var, &out);
|
int32_t code = doConvertDataType(var, &out);
|
||||||
|
@ -2985,13 +2992,13 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
uint32_t uidx = info->groups[0].unitIdxs[0];
|
uint32_t uidx = info->groups[0].unitIdxs[0];
|
||||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
||||||
if (colData == NULL || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL)) {
|
if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
|
||||||
(*p)[i] = 0;
|
(*p)[i] = 0;
|
||||||
all = false;
|
all = false;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// match/nmatch for nchar type need convert from ucs4 to mbs
|
|
||||||
|
|
||||||
|
// match/nmatch for nchar type need convert from ucs4 to mbs
|
||||||
if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR && (info->cunits[uidx].optr == OP_TYPE_MATCH || info->cunits[uidx].optr == OP_TYPE_NMATCH)){
|
if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR && (info->cunits[uidx].optr == OP_TYPE_MATCH || info->cunits[uidx].optr == OP_TYPE_NMATCH)){
|
||||||
char *newColData = taosMemoryCalloc(info->cunits[uidx].dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1);
|
char *newColData = taosMemoryCalloc(info->cunits[uidx].dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1);
|
||||||
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(colData), varDataLen(colData), varDataVal(newColData));
|
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(colData), varDataLen(colData), varDataVal(newColData));
|
||||||
|
@ -3222,19 +3229,13 @@ int32_t fltInitFromNode(SNode* tree, SFilterInfo *info, uint32_t options) {
|
||||||
info->unitFlags = taosMemoryMalloc(info->unitNum * sizeof(*info->unitFlags));
|
info->unitFlags = taosMemoryMalloc(info->unitNum * sizeof(*info->unitFlags));
|
||||||
|
|
||||||
filterDumpInfoToString(info, "Final", 0);
|
filterDumpInfoToString(info, "Final", 0);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
qInfo("init from node failed, code:%d", code);
|
qInfo("init from node failed, code:%d", code);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows) {
|
bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows) {
|
||||||
if (FILTER_EMPTY_RES(info)) {
|
if (FILTER_EMPTY_RES(info)) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -46,7 +46,6 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
|
||||||
colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
|
colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
|
||||||
|
|
||||||
colInfoDataEnsureCapacity(out->columnData, 1);
|
colInfoDataEnsureCapacity(out->columnData, 1);
|
||||||
|
|
||||||
int32_t code = vectorConvertImpl(&in, out);
|
int32_t code = vectorConvertImpl(&in, out);
|
||||||
sclFreeParam(&in);
|
sclFreeParam(&in);
|
||||||
|
|
||||||
|
|
|
@ -177,9 +177,24 @@ static FORCE_INLINE void varToBool(char *buf, SScalarParam* pOut, int32_t rowInd
|
||||||
colDataAppendInt8(pOut->columnData, rowIndex, (int8_t*) &v);
|
colDataAppendInt8(pOut->columnData, rowIndex, (int8_t*) &v);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIndex) {
|
||||||
|
int32_t len = 0;
|
||||||
|
int32_t inputLen = varDataLen(buf);
|
||||||
|
|
||||||
|
char* t = taosMemoryCalloc(1,(inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
|
||||||
|
/*int32_t resLen = */taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4*) varDataVal(t), pOut->columnData->info.bytes, &len);
|
||||||
|
varDataSetLen(t, len);
|
||||||
|
|
||||||
|
colDataAppend(pOut->columnData, rowIndex, t, false);
|
||||||
|
taosMemoryFree(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO opt performance, tmp is not needed.
|
||||||
int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) {
|
int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) {
|
||||||
int32_t bufSize = pIn->columnData->info.bytes;
|
int32_t bufSize = pIn->columnData->info.bytes;
|
||||||
char *tmp = taosMemoryMalloc(bufSize);
|
char *tmp = taosMemoryMalloc(bufSize + VARSTR_HEADER_SIZE);
|
||||||
|
|
||||||
|
bool vton = false;
|
||||||
|
|
||||||
_bufConverteFunc func = NULL;
|
_bufConverteFunc func = NULL;
|
||||||
if (TSDB_DATA_TYPE_BOOL == outType) {
|
if (TSDB_DATA_TYPE_BOOL == outType) {
|
||||||
|
@ -190,6 +205,9 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
|
||||||
func = varToUnsigned;
|
func = varToUnsigned;
|
||||||
} else if (IS_FLOAT_TYPE(outType)) {
|
} else if (IS_FLOAT_TYPE(outType)) {
|
||||||
func = varToFloat;
|
func = varToFloat;
|
||||||
|
} else if (outType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
func = varToNchar;
|
||||||
|
vton = true;
|
||||||
} else {
|
} else {
|
||||||
sclError("invalid convert outType:%d", outType);
|
sclError("invalid convert outType:%d", outType);
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
@ -197,26 +215,30 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
|
||||||
|
|
||||||
pOut->numOfRows = pIn->numOfRows;
|
pOut->numOfRows = pIn->numOfRows;
|
||||||
for (int32_t i = 0; i < pIn->numOfRows; ++i) {
|
for (int32_t i = 0; i < pIn->numOfRows; ++i) {
|
||||||
if (colDataIsNull(pIn->columnData, pIn->numOfRows, i, NULL)) {
|
if (colDataIsNull_s(pIn->columnData, i)) {
|
||||||
colDataAppendNULL(pOut->columnData, i);
|
colDataAppendNULL(pOut->columnData, i);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* data = colDataGetData(pIn->columnData, i);
|
char* data = colDataGetData(pIn->columnData, i);
|
||||||
if (TSDB_DATA_TYPE_BINARY == inType) {
|
if (vton) {
|
||||||
memcpy(tmp, varDataVal(data), varDataLen(data));
|
memcpy(tmp, data, varDataTLen(data));
|
||||||
tmp[varDataLen(data)] = 0;
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT (varDataLen(data) <= bufSize);
|
if (TSDB_DATA_TYPE_VARCHAR == inType) {
|
||||||
|
memcpy(tmp, varDataVal(data), varDataLen(data));
|
||||||
int len = taosUcs4ToMbs((TdUcs4*)varDataVal(data), varDataLen(data), tmp);
|
tmp[varDataLen(data)] = 0;
|
||||||
if (len < 0){
|
} else {
|
||||||
sclError("castConvert taosUcs4ToMbs error 1");
|
ASSERT(varDataLen(data) <= bufSize);
|
||||||
taosMemoryFreeClear(tmp);
|
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
int len = taosUcs4ToMbs((TdUcs4 *)varDataVal(data), varDataLen(data), tmp);
|
||||||
|
if (len < 0) {
|
||||||
|
sclError("castConvert taosUcs4ToMbs error 1");
|
||||||
|
taosMemoryFreeClear(tmp);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
tmp[len] = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmp[len] = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
(*func)(tmp, pOut, i);
|
(*func)(tmp, pOut, i);
|
||||||
|
|
Loading…
Reference in New Issue