Merge pull request #21963 from luckeverda/fix/TD-24473-new

fix/TD-24473
This commit is contained in:
wade zhang 2023-07-11 09:32:25 +08:00 committed by GitHub
commit 13bc1e744e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 138 additions and 77 deletions

View File

@ -41,7 +41,7 @@ typedef struct SFilterColumnParam {
} SFilterColumnParam; } SFilterColumnParam;
extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options); extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SColumnDataAgg *statis, extern int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SColumnDataAgg *statis,
int16_t numOfCols, int32_t *pFilterResStatus); int16_t numOfCols, int32_t *pFilterResStatus);
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param); extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param); extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);

View File

@ -765,6 +765,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
#define TSDB_CODE_INDEX_INVALID_FILE TAOS_DEF_ERROR_CODE(0, 0x3201) #define TSDB_CODE_INDEX_INVALID_FILE TAOS_DEF_ERROR_CODE(0, 0x3201)
//scalar
#define TSDB_CODE_SCALAR_CONVERT_ERROR TAOS_DEF_ERROR_CODE(0, 0x3250)
//tmq //tmq
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000) #define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001) #define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)

View File

@ -619,7 +619,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* de
extern void doDestroyExchangeOperatorInfo(void* param); extern void doDestroyExchangeOperatorInfo(void* param);
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
int32_t rows, const char* idStr, STableMetaCacheInfo* pCache); int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);

View File

@ -77,8 +77,7 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock*
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol); bool createDummyCol);
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
@ -501,20 +500,26 @@ void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
} }
} }
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) { int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
if (pFilterInfo == NULL || pBlock->info.rows == 0) { if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return; return TSDB_CODE_SUCCESS;
} }
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1); SColumnInfoData* p = NULL;
SColumnInfoData* p = NULL; int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
int32_t status = 0; if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
// todo the keep seems never to be True?? int32_t status = 0;
bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
extractQualifiedTupleByFilterResult(pBlock, p, keep, status); if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
extractQualifiedTupleByFilterResult(pBlock, p, status);
if (pColMatchInfo != NULL) { if (pColMatchInfo != NULL) {
size_t size = taosArrayGetSize(pColMatchInfo->pList); size_t size = taosArrayGetSize(pColMatchInfo->pList);
@ -529,23 +534,24 @@ void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pCol
} }
} }
} }
code = TSDB_CODE_SUCCESS;
_err:
colDataDestroy(p); colDataDestroy(p);
taosMemoryFree(p); taosMemoryFree(p);
return code;
} }
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) { void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) {
if (keep) {
return;
}
int8_t* pIndicator = (int8_t*)p->pData; int8_t* pIndicator = (int8_t*)p->pData;
if (status == FILTER_RESULT_ALL_QUALIFIED) { if (status == FILTER_RESULT_ALL_QUALIFIED) {
// here nothing needs to be done // here nothing needs to be done
} else if (status == FILTER_RESULT_NONE_QUALIFIED) { } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
pBlock->info.rows = 0; pBlock->info.rows = 0;
} else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator);
} else { } else {
trimDataBlock(pBlock, pBlock->info.rows, (bool*) pIndicator); qError("unknown filter result type: %d", status);
} }
} }
@ -587,7 +593,7 @@ void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultR
pCtx[j].resultInfo->numOfRes = pRow->numOfRows; pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
} }
} }
blockDataEnsureCapacity(pBlock, pBlock->info.rows + pCtx[j].resultInfo->numOfRes); blockDataEnsureCapacity(pBlock, pBlock->info.rows + pCtx[j].resultInfo->numOfRes);
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code)) { if (TAOS_FAILED(code)) {
@ -1062,5 +1068,5 @@ void streamOpReloadState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) { if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream); downstream->fpSet.reloadStreamStateFn(downstream);
} }
} }

View File

@ -401,9 +401,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pCost->totalRows -= pBlock->info.rows; pCost->totalRows -= pBlock->info.rows;
if (pOperator->exprSupp.pFilterInfo != NULL) { if (pOperator->exprSupp.pFilterInfo != NULL) {
int64_t st = taosGetTimestampUs(); int32_t code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) return code;
int64_t st = taosGetTimestampUs();
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += el; pTableScanInfo->readRecorder.filterTime += el;
@ -2880,7 +2881,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
} else if (kWay <= 2) { } else if (kWay <= 2) {
kWay = 2; kWay = 2;
} else { } else {
int i = 2; int i = 2;
while (i * 2 <= kWay) i = i * 2; while (i * 2 <= kWay) i = i * 2;
kWay = i; kWay = i;
} }

View File

@ -1979,7 +1979,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
int32_t code = sclConvertValueToSclParam(var, &out, NULL); int32_t code = sclConvertValueToSclParam(var, &out, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("convert value to type[%d] failed", type); qError("convert value to type[%d] failed", type);
return TSDB_CODE_TSC_INVALID_OPERATION; return code;
} }
size_t bufBytes = IS_VAR_DATA_TYPE(type) ? varDataTLen(out.columnData->pData) size_t bufBytes = IS_VAR_DATA_TYPE(type) ? varDataTLen(out.columnData->pData)
@ -4644,11 +4644,11 @@ _return:
FLT_RET(code); FLT_RET(code);
} }
bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SColumnDataAgg *statis, int16_t numOfCols, int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SColumnDataAgg *statis,
int32_t *pResultStatus) { int16_t numOfCols, int32_t *pResultStatus) {
if (NULL == info) { if (NULL == info) {
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED; *pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
return false; return TSDB_CODE_SUCCESS;
} }
SScalarParam output = {0}; SScalarParam output = {0};
@ -4656,7 +4656,7 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC
int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output); int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return code;
} }
if (info->scalarMode) { if (info->scalarMode) {
@ -4666,7 +4666,7 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC
code = scalarCalculate(info->sclCtx.node, pList, &output); code = scalarCalculate(info->sclCtx.node, pList, &output);
taosArrayDestroy(pList); taosArrayDestroy(pList);
FLT_ERR_RET(code); // TODO: current errcode returns as true FLT_ERR_RET(code);
*p = output.columnData; *p = output.columnData;
@ -4677,18 +4677,23 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC
} else { } else {
*pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED; *pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED;
} }
return false; return TSDB_CODE_SUCCESS;
}
ASSERT(false == info->scalarMode);
*p = output.columnData;
output.numOfRows = pSrc->info.rows;
if (*p == NULL) {
return TSDB_CODE_APP_ERROR;
}
bool keepAll = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified);
// todo this should be return during filter procedure
if (keepAll) {
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
} else { } else {
*p = output.columnData;
output.numOfRows = pSrc->info.rows;
if (*p == NULL) {
return false;
}
bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified);
// todo this should be return during filter procedure
int32_t num = 0; int32_t num = 0;
for (int32_t i = 0; i < output.numOfRows; ++i) { for (int32_t i = 0; i < output.numOfRows; ++i) {
if (((int8_t *)((*p)->pData))[i] == 1) { if (((int8_t *)((*p)->pData))[i] == 1) {
@ -4703,9 +4708,9 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC
} else { } else {
*pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED; *pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED;
} }
return keep;
} }
return TSDB_CODE_SUCCESS;
} }
typedef struct SClassifyConditionCxt { typedef struct SClassifyConditionCxt {

View File

@ -240,15 +240,20 @@ _getValueAddr_fn_t getVectorValueAddrFn(int32_t srcType) {
} }
static FORCE_INLINE void varToTimestamp(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { static FORCE_INLINE void varToTimestamp(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) {
terrno = TSDB_CODE_SUCCESS;
int64_t value = 0; int64_t value = 0;
if (taosParseTime(buf, &value, strlen(buf), pOut->columnData->info.precision, tsDaylight) != TSDB_CODE_SUCCESS) { if (taosParseTime(buf, &value, strlen(buf), pOut->columnData->info.precision, tsDaylight) != TSDB_CODE_SUCCESS) {
value = 0; value = 0;
terrno = TSDB_CODE_SCALAR_CONVERT_ERROR;
} }
colDataSetInt64(pOut->columnData, rowIndex, &value); colDataSetInt64(pOut->columnData, rowIndex, &value);
} }
static FORCE_INLINE void varToSigned(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { static FORCE_INLINE void varToSigned(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) {
terrno = TSDB_CODE_SUCCESS;
if (overflow) { if (overflow) {
int64_t minValue = tDataTypes[pOut->columnData->info.type].minValue; int64_t minValue = tDataTypes[pOut->columnData->info.type].minValue;
int64_t maxValue = tDataTypes[pOut->columnData->info.type].maxValue; int64_t maxValue = tDataTypes[pOut->columnData->info.type].maxValue;
@ -290,6 +295,8 @@ static FORCE_INLINE void varToSigned(char *buf, SScalarParam *pOut, int32_t rowI
} }
static FORCE_INLINE void varToUnsigned(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { static FORCE_INLINE void varToUnsigned(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) {
terrno = TSDB_CODE_SUCCESS;
if (overflow) { if (overflow) {
uint64_t minValue = (uint64_t)tDataTypes[pOut->columnData->info.type].minValue; uint64_t minValue = (uint64_t)tDataTypes[pOut->columnData->info.type].minValue;
uint64_t maxValue = (uint64_t)tDataTypes[pOut->columnData->info.type].maxValue; uint64_t maxValue = (uint64_t)tDataTypes[pOut->columnData->info.type].maxValue;
@ -330,6 +337,8 @@ static FORCE_INLINE void varToUnsigned(char *buf, SScalarParam *pOut, int32_t ro
} }
static FORCE_INLINE void varToFloat(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { static FORCE_INLINE void varToFloat(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) {
terrno = TSDB_CODE_SUCCESS;
if (TSDB_DATA_TYPE_FLOAT == pOut->columnData->info.type) { if (TSDB_DATA_TYPE_FLOAT == pOut->columnData->info.type) {
float value = taosStr2Float(buf, NULL); float value = taosStr2Float(buf, NULL);
colDataSetFloat(pOut->columnData, rowIndex, &value); colDataSetFloat(pOut->columnData, rowIndex, &value);
@ -341,6 +350,8 @@ static FORCE_INLINE void varToFloat(char *buf, SScalarParam *pOut, int32_t rowIn
} }
static FORCE_INLINE void varToBool(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { static FORCE_INLINE void varToBool(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) {
terrno = TSDB_CODE_SUCCESS;
int64_t value = taosStr2Int64(buf, NULL, 10); int64_t value = taosStr2Int64(buf, NULL, 10);
bool v = (value != 0) ? true : false; bool v = (value != 0) ? true : false;
colDataSetInt8(pOut->columnData, rowIndex, (int8_t *)&v); colDataSetInt8(pOut->columnData, rowIndex, (int8_t *)&v);
@ -348,6 +359,8 @@ static FORCE_INLINE void varToBool(char *buf, SScalarParam *pOut, int32_t rowInd
// todo remove this malloc // todo remove this malloc
static FORCE_INLINE void varToNchar(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { static FORCE_INLINE void varToNchar(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) {
terrno = TSDB_CODE_SUCCESS;
int32_t len = 0; int32_t len = 0;
int32_t inputLen = varDataLen(buf); int32_t inputLen = varDataLen(buf);
int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
@ -357,6 +370,7 @@ static FORCE_INLINE void varToNchar(char *buf, SScalarParam *pOut, int32_t rowIn
taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4 *)varDataVal(t), outputMaxLen - VARSTR_HEADER_SIZE, &len); taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4 *)varDataVal(t), outputMaxLen - VARSTR_HEADER_SIZE, &len);
if (!ret) { if (!ret) {
sclError("failed to convert to NCHAR"); sclError("failed to convert to NCHAR");
terrno = TSDB_CODE_SCALAR_CONVERT_ERROR;
} }
varDataSetLen(t, len); varDataSetLen(t, len);
@ -365,11 +379,14 @@ static FORCE_INLINE void varToNchar(char *buf, SScalarParam *pOut, int32_t rowIn
} }
static FORCE_INLINE void ncharToVar(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { static FORCE_INLINE void ncharToVar(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) {
terrno = TSDB_CODE_SUCCESS;
int32_t inputLen = varDataLen(buf); int32_t inputLen = varDataLen(buf);
char *t = taosMemoryCalloc(1, inputLen + VARSTR_HEADER_SIZE); char *t = taosMemoryCalloc(1, inputLen + VARSTR_HEADER_SIZE);
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(buf), varDataLen(buf), varDataVal(t)); int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(buf), varDataLen(buf), varDataVal(t));
if (len < 0) { if (len < 0) {
terrno = TSDB_CODE_SCALAR_CONVERT_ERROR;
taosMemoryFree(t); taosMemoryFree(t);
return; return;
} }
@ -379,22 +396,26 @@ static FORCE_INLINE void ncharToVar(char *buf, SScalarParam *pOut, int32_t rowIn
taosMemoryFree(t); taosMemoryFree(t);
} }
// todo remove this malloc
static FORCE_INLINE void varToGeometry(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { static FORCE_INLINE void varToGeometry(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) {
//[ToDo] support to parse WKB as well as WKT //[ToDo] support to parse WKB as well as WKT
unsigned char *t = NULL; terrno = TSDB_CODE_SUCCESS;
size_t len = 0; size_t len = 0;
unsigned char *t = NULL;
char *output = NULL;
if (initCtxGeomFromText()) { if (initCtxGeomFromText()) {
sclError("failed to init geometry ctx"); sclError("failed to init geometry ctx, %s", getThreadLocalGeosCtx()->errMsg);
return; terrno = TSDB_CODE_APP_ERROR;
goto _err;
} }
if (doGeomFromText(buf, &t, &len)) { if (doGeomFromText(buf, &t, &len)) {
sclDebug("failed to convert text to geometry"); sclInfo("failed to convert text to geometry, %s", getThreadLocalGeosCtx()->errMsg);
return; terrno = TSDB_CODE_SCALAR_CONVERT_ERROR;
goto _err;
} }
char *output = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); output = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
memcpy(output + VARSTR_HEADER_SIZE, t, len); memcpy(output + VARSTR_HEADER_SIZE, t, len);
varDataSetLen(output, len); varDataSetLen(output, len);
@ -402,10 +423,19 @@ static FORCE_INLINE void varToGeometry(char *buf, SScalarParam *pOut, int32_t ro
taosMemoryFree(output); taosMemoryFree(output);
geosFreeBuffer(t); geosFreeBuffer(t);
return;
_err:
ASSERT(t == NULL && len == 0);
VarDataLenT dummyHeader = 0;
colDataSetVal(pOut->columnData, rowIndex, (const char *)&dummyHeader, false);
} }
// TODO opt performance, tmp is not needed. // TODO opt performance, tmp is not needed.
int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) { int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) {
terrno = TSDB_CODE_SUCCESS;
bool vton = false; bool vton = false;
_bufConverteFunc func = NULL; _bufConverteFunc func = NULL;
@ -431,7 +461,8 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) {
func = varToGeometry; func = varToGeometry;
} else { } else {
sclError("invalid convert outType:%d, inType:%d", pCtx->outType, pCtx->inType); sclError("invalid convert outType:%d, inType:%d", pCtx->outType, pCtx->inType);
return TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
return terrno;
} }
pCtx->pOut->numOfRows = pCtx->pIn->numOfRows; pCtx->pOut->numOfRows = pCtx->pIn->numOfRows;
@ -451,7 +482,7 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) {
convertType = TSDB_DATA_TYPE_NCHAR; convertType = TSDB_DATA_TYPE_NCHAR;
} else if (tTagIsJson(data) || *data == TSDB_DATA_TYPE_NULL) { } else if (tTagIsJson(data) || *data == TSDB_DATA_TYPE_NULL) {
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR; terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
return terrno; goto _err;
} else { } else {
convertNumberToNumber(data + CHAR_BYTES, colDataGetNumData(pCtx->pOut->columnData, i), *data, pCtx->outType); convertNumberToNumber(data + CHAR_BYTES, colDataGetNumData(pCtx->pOut->columnData, i), *data, pCtx->outType);
continue; continue;
@ -463,7 +494,8 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) {
tmp = taosMemoryMalloc(bufSize); tmp = taosMemoryMalloc(bufSize);
if (tmp == NULL) { if (tmp == NULL) {
sclError("out of memory in vectorConvertFromVarData"); sclError("out of memory in vectorConvertFromVarData");
return TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
} }
} }
@ -477,15 +509,15 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) {
// we need to convert it to native char string, and then perform the string to numeric data // we need to convert it to native char string, and then perform the string to numeric data
if (varDataLen(data) > bufSize) { if (varDataLen(data) > bufSize) {
sclError("castConvert convert buffer size too small"); sclError("castConvert convert buffer size too small");
taosMemoryFreeClear(tmp); terrno = TSDB_CODE_APP_ERROR;
return TSDB_CODE_APP_ERROR; goto _err;
} }
int len = taosUcs4ToMbs((TdUcs4 *)varDataVal(data), varDataLen(data), tmp); int len = taosUcs4ToMbs((TdUcs4 *)varDataVal(data), varDataLen(data), tmp);
if (len < 0) { if (len < 0) {
sclError("castConvert taosUcs4ToMbs error 1"); sclError("castConvert taosUcs4ToMbs error 1");
taosMemoryFreeClear(tmp); terrno = TSDB_CODE_SCALAR_CONVERT_ERROR;
return TSDB_CODE_APP_ERROR; goto _err;
} }
tmp[len] = 0; tmp[len] = 0;
@ -493,12 +525,16 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) {
} }
(*func)(tmp, pCtx->pOut, i, overflow); (*func)(tmp, pCtx->pOut, i, overflow);
if (terrno != TSDB_CODE_SUCCESS) {
goto _err;
}
} }
_err:
if (tmp != NULL) { if (tmp != NULL) {
taosMemoryFreeClear(tmp); taosMemoryFreeClear(tmp);
} }
return TSDB_CODE_SUCCESS; return terrno;
} }
double getVectorDoubleValue_JSON(void *src, int32_t index) { double getVectorDoubleValue_JSON(void *src, int32_t index) {
@ -911,25 +947,25 @@ int32_t vectorConvertSingleColImpl(const SScalarParam *pIn, SScalarParam *pOut,
int8_t gConvertTypes[TSDB_DATA_TYPE_MAX][TSDB_DATA_TYPE_MAX] = { int8_t gConvertTypes[TSDB_DATA_TYPE_MAX][TSDB_DATA_TYPE_MAX] = {
/* NULL BOOL TINY SMAL INT BIG FLOA DOUB VARC TIME NCHA UTIN USMA UINT UBIG JSON VARB DECI BLOB MEDB GEOM*/ /* NULL BOOL TINY SMAL INT BIG FLOA DOUB VARC TIME NCHA UTIN USMA UINT UBIG JSON VARB DECI BLOB MEDB GEOM*/
/*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
/*BOOL*/ 0, 0, 2, 3, 4, 5, 6, 7, 5, 9, 7, 11, 12, 13, 14, 0, 7, 0, 0, 0, 0, /*BOOL*/ 0, 0, 2, 3, 4, 5, 6, 7, 5, 9, 7, 11, 12, 13, 14, 0, 7, 0, 0, 0, -1,
/*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 5, 9, 7, 3, 4, 5, 7, 0, 7, 0, 0, 0, 0, /*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 5, 9, 7, 3, 4, 5, 7, 0, 7, 0, 0, 0, -1,
/*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 5, 9, 7, 3, 4, 5, 7, 0, 7, 0, 0, 0, 0, /*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 5, 9, 7, 3, 4, 5, 7, 0, 7, 0, 0, 0, -1,
/*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 5, 9, 7, 4, 4, 5, 7, 0, 7, 0, 0, 0, 0, /*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 5, 9, 7, 4, 4, 5, 7, 0, 7, 0, 0, 0, -1,
/*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 5, 9, 7, 5, 5, 5, 7, 0, 7, 0, 0, 0, 0, /*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 5, 9, 7, 5, 5, 5, 7, 0, 7, 0, 0, 0, -1,
/*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 0, 7, 0, 0, 0, 0, /*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 0, 7, 0, 0, 0, -1,
/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 0, 7, 0, 0, 0, 0, /*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 0, 7, 0, 0, 0, -1,
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 8, 7, 7, 7, 7, 0, 0, 0, 0, 0, 20, /*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 8, 7, 7, 7, 7, 0, 0, 0, 0, 0, 20,
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 7, 0, 7, 0, 0, 0, 0, /*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 7, 0, 7, 0, 0, 0, -1,
/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0, 0, 0, /*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0, 0, -1,
/*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 0, 7, 0, 0, 0, 0, /*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 0, 7, 0, 0, 0, -1,
/*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 0, 7, 0, 0, 0, 0, /*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 0, 7, 0, 0, 0, -1,
/*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 0, 7, 0, 0, 0, 0, /*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 0, 7, 0, 0, 0, -1,
/*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, /*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, -1,
/*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1,
/*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1,
/*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1,
/*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1,
/*MEDB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /*MEDB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1,
/*GEOM*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; /*GEOM*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
int32_t vectorGetConvertType(int32_t type1, int32_t type2) { int32_t vectorGetConvertType(int32_t type1, int32_t type2) {
@ -1010,6 +1046,11 @@ int32_t vectorConvertCols(SScalarParam *pLeft, SScalarParam *pRight, SScalarPara
if (0 == type) { if (0 == type) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (-1 == type) {
sclError("invalid convert type1:%d, type2:%d", GET_PARAM_TYPE(param1), GET_PARAM_TYPE(param2));
terrno = TSDB_CODE_SCALAR_CONVERT_ERROR;
return TSDB_CODE_SCALAR_CONVERT_ERROR;
}
} }
if (type != GET_PARAM_TYPE(param1)) { if (type != GET_PARAM_TYPE(param1)) {
@ -1753,7 +1794,9 @@ void vectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *
param1 = pLeft; param1 = pLeft;
param2 = pRight; param2 = pRight;
} else { } else {
vectorConvertCols(pLeft, pRight, &pLeftOut, &pRightOut, startIndex, numOfRows); if (vectorConvertCols(pLeft, pRight, &pLeftOut, &pRightOut, startIndex, numOfRows)) {
return;
}
param1 = (pLeftOut.columnData != NULL) ? &pLeftOut : pLeft; param1 = (pLeftOut.columnData != NULL) ? &pLeftOut : pLeft;
param2 = (pRightOut.columnData != NULL) ? &pRightOut : pRight; param2 = (pRightOut.columnData != NULL) ? &pRightOut : pRight;
} }

View File

@ -627,6 +627,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_UPDATE, "Rsma fs update erro
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is invalid") TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is invalid")
//scalar
TAOS_DEFINE_ERROR(TSDB_CODE_SCALAR_CONVERT_ERROR, "Cannot convert to specific type")
//tmq //tmq
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
@ -676,7 +679,7 @@ const char* tstrerror(int32_t err) {
if ((err & 0x00ff0000) == 0x00ff0000) { if ((err & 0x00ff0000) == 0x00ff0000) {
int32_t code = err & 0x0000ffff; int32_t code = err & 0x0000ffff;
// strerror can handle any invalid code // strerror can handle any invalid code
// invalid code return Unknown error // invalid code return Unknown error
return strerror(code); return strerror(code);
} }
int32_t s = 0; int32_t s = 0;