refactor(query): do some internal refactor.
This commit is contained in:
parent
50ee0dabbf
commit
33c075542d
|
@ -78,7 +78,6 @@ enum {
|
|||
MAIN_SCAN = 0x0u,
|
||||
REVERSE_SCAN = 0x1u, // todo remove it
|
||||
REPEAT_SCAN = 0x2u, // repeat scan belongs to the master scan
|
||||
MERGE_STAGE = 0x20u,
|
||||
};
|
||||
|
||||
typedef struct SPoint1 {
|
||||
|
@ -156,11 +155,6 @@ typedef struct SqlFunctionCtx {
|
|||
char udfName[TSDB_FUNC_NAME_LEN];
|
||||
} SqlFunctionCtx;
|
||||
|
||||
enum {
|
||||
TEXPR_BINARYEXPR_NODE = 0x1,
|
||||
TEXPR_UNARYEXPR_NODE = 0x2,
|
||||
};
|
||||
|
||||
typedef struct tExprNode {
|
||||
int32_t nodeType;
|
||||
union {
|
||||
|
@ -182,8 +176,9 @@ struct SScalarParam {
|
|||
SColumnInfoData *columnData;
|
||||
SHashObj *pHashFilter;
|
||||
int32_t hashValueType;
|
||||
void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value
|
||||
void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value
|
||||
int32_t numOfRows;
|
||||
int32_t numOfQualified; // number of qualified elements in the final results
|
||||
};
|
||||
|
||||
void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell);
|
||||
|
@ -201,8 +196,6 @@ int32_t taosGetLinearInterpolationVal(SPoint *point, int32_t outputType, SPoint
|
|||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// udf api
|
||||
struct SUdfInfo;
|
||||
|
||||
/**
|
||||
* create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf
|
||||
* @return error code
|
||||
|
@ -226,6 +219,7 @@ int32_t udfStartUdfd(int32_t startDnodeId);
|
|||
* @return
|
||||
*/
|
||||
int32_t udfStopUdfd();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -31,13 +31,17 @@ enum {
|
|||
FLT_OPTION_NEED_UNIQE = 4,
|
||||
};
|
||||
|
||||
#define FILTER_RESULT_ALL_QUALIFIED 0x1
|
||||
#define FILTER_RESULT_NONE_QUALIFIED 0x2
|
||||
#define FILTER_RESULT_PARTIAL_QUALIFIED 0x3
|
||||
|
||||
typedef struct SFilterColumnParam {
|
||||
int32_t numOfCols;
|
||||
SArray *pDataBlock;
|
||||
} SFilterColumnParam;
|
||||
|
||||
extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options);
|
||||
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t **p, SColumnDataAgg *statis, int16_t numOfCols);
|
||||
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData** p, SColumnDataAgg *statis, int16_t numOfCols, int32_t* pFilterResStatus);
|
||||
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
|
||||
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);
|
||||
extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict);
|
||||
|
|
|
@ -81,11 +81,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
|
|||
|
||||
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }
|
||||
|
||||
static int32_t getExprFunctionId(SExprInfo* pExprInfo) {
|
||||
assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
|
||||
|
||||
static void releaseQueryBuf(size_t numOfTables);
|
||||
|
@ -1115,7 +1110,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
|
|||
}
|
||||
}
|
||||
|
||||
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
|
||||
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status);
|
||||
|
||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) {
|
||||
if (pFilterNode == NULL || pBlock->info.rows == 0) {
|
||||
|
@ -1126,18 +1121,17 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
|
|||
|
||||
// todo move to the initialization function
|
||||
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock};
|
||||
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
|
||||
code = filterSetDataFromSlotId(filter, ¶m1);
|
||||
|
||||
int8_t* rowRes = NULL;
|
||||
SColumnInfoData* p = NULL;
|
||||
int32_t status = 0;
|
||||
|
||||
// todo the keep seems never to be True??
|
||||
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
|
||||
bool keep = filterExecute(filter, pBlock, &p, NULL, param1.numOfCols, &status);
|
||||
filterFreeInfo(filter);
|
||||
|
||||
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
|
||||
extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
|
||||
|
||||
if (pColMatchInfo != NULL) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) {
|
||||
|
@ -1152,16 +1146,22 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
|
|||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(rowRes);
|
||||
colDataDestroy(p);
|
||||
taosMemoryFree(p);
|
||||
}
|
||||
|
||||
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
|
||||
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
|
||||
if (keep) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (rowRes != NULL) {
|
||||
int32_t totalRows = pBlock->info.rows;
|
||||
int32_t totalRows = pBlock->info.rows;
|
||||
|
||||
if (status == FILTER_RESULT_ALL_QUALIFIED) {
|
||||
// here nothing needs to be done
|
||||
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
|
||||
pBlock->info.rows = 0;
|
||||
} else {
|
||||
SSDataBlock* px = createOneDataBlock(pBlock, true);
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
|
@ -1177,7 +1177,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
|||
|
||||
int32_t numOfRows = 0;
|
||||
for (int32_t j = 0; j < totalRows; ++j) {
|
||||
if (rowRes[j] == 0) {
|
||||
if (((int8_t*)p->pData)[j] == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1189,6 +1189,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
|||
numOfRows += 1;
|
||||
}
|
||||
|
||||
// todo this value can be assigned directly
|
||||
if (pBlock->info.rows == totalRows) {
|
||||
pBlock->info.rows = numOfRows;
|
||||
} else {
|
||||
|
@ -1197,13 +1198,10 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
|||
}
|
||||
|
||||
blockDataDestroy(px); // fix memory leak
|
||||
} else {
|
||||
// do nothing
|
||||
pBlock->info.rows = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
|
||||
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
|
||||
// for simple group by query without interval, all the tables belong to one group result.
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||
|
@ -4242,10 +4240,10 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock
|
|||
|
||||
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
|
||||
if (pCtx[j].fpSet.finalize) {
|
||||
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||
if (TAOS_FAILED(code)) {
|
||||
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||
if (TAOS_FAILED(code1)) {
|
||||
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
|
||||
T_LONG_JMP(pTaskInfo->env, code1);
|
||||
}
|
||||
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
||||
// do nothing, todo refactor
|
||||
|
|
|
@ -26,11 +26,6 @@ typedef struct SFuncMgtService {
|
|||
SHashObj* pFuncNameHashTable;
|
||||
} SFuncMgtService;
|
||||
|
||||
typedef struct SUdfInfo {
|
||||
SDataType outputDt;
|
||||
int8_t funcType;
|
||||
} SUdfInfo;
|
||||
|
||||
static SFuncMgtService gFunMgtService;
|
||||
static TdThreadOnce functionHashTableInit = PTHREAD_ONCE_INIT;
|
||||
static int32_t initFunctionCode = 0;
|
||||
|
|
|
@ -99,7 +99,7 @@ typedef struct SFilterRange {
|
|||
|
||||
typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t);
|
||||
typedef int32_t(*filter_desc_compare_func)(const void *, const void *);
|
||||
typedef bool(*filter_exec_func)(void *, int32_t, int8_t**, SColumnDataAgg *, int16_t);
|
||||
typedef bool(*filter_exec_func)(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols);
|
||||
typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char*, void **);
|
||||
|
||||
typedef struct SFilterRangeCompare {
|
||||
|
|
|
@ -2976,14 +2976,12 @@ _return:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||
bool all = true;
|
||||
uint32_t *unitIdx = NULL;
|
||||
|
||||
if (*p == NULL) {
|
||||
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
|
||||
}
|
||||
int8_t* p = (int8_t*)pRes->pData;
|
||||
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
//FILTER_UNIT_CLR_F(info);
|
||||
|
@ -3002,35 +3000,35 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p,
|
|||
uint8_t optr = cunit->optr;
|
||||
|
||||
if (colDataIsNull((SColumnInfoData *)(cunit->colData), 0, i, NULL)) {
|
||||
(*p)[i] = optr == OP_TYPE_IS_NULL ? true : false;
|
||||
p[i] = (optr == OP_TYPE_IS_NULL) ? true : false;
|
||||
} else {
|
||||
if (optr == OP_TYPE_IS_NOT_NULL) {
|
||||
(*p)[i] = 1;
|
||||
p[i] = 1;
|
||||
} else if (optr == OP_TYPE_IS_NULL) {
|
||||
(*p)[i] = 0;
|
||||
p[i] = 0;
|
||||
} else if (cunit->rfunc >= 0) {
|
||||
(*p)[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]);
|
||||
p[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]);
|
||||
} else {
|
||||
(*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData);
|
||||
p[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData);
|
||||
}
|
||||
|
||||
//FILTER_UNIT_SET_R(info, uidx, p[i]);
|
||||
//FILTER_UNIT_SET_F(info, uidx);
|
||||
}
|
||||
|
||||
if ((*p)[i] == 0) {
|
||||
if (p[i] == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ((*p)[i]) {
|
||||
if (p[i]) {
|
||||
break;
|
||||
}
|
||||
|
||||
unitIdx += unitNum;
|
||||
}
|
||||
|
||||
if ((*p)[i] == 0) {
|
||||
if (p[i] == 0) {
|
||||
all = false;
|
||||
}
|
||||
}
|
||||
|
@ -3040,7 +3038,7 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p,
|
|||
|
||||
|
||||
|
||||
int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols, bool* all) {
|
||||
int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols, bool* all) {
|
||||
if (statis && numOfRows >= FILTER_RM_UNIT_MIN_ROWS) {
|
||||
info->blkFlag = 0;
|
||||
|
||||
|
@ -3058,7 +3056,6 @@ int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, int8_t*
|
|||
assert(info->unitNum > 1);
|
||||
|
||||
*all = filterExecuteBasedOnStatisImpl(info, numOfRows, p, statis, numOfCols);
|
||||
|
||||
goto _return;
|
||||
}
|
||||
}
|
||||
|
@ -3067,59 +3064,55 @@ int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, int8_t*
|
|||
|
||||
_return:
|
||||
info->blkFlag = 0;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
return true;
|
||||
}
|
||||
static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
|
||||
static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
return false;
|
||||
}
|
||||
static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
|
||||
static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||
bool all = true;
|
||||
|
||||
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) {
|
||||
return all;
|
||||
}
|
||||
int8_t* p = (int8_t*)pRes->pData;
|
||||
|
||||
if (*p == NULL) {
|
||||
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
|
||||
if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
|
||||
return all;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
uint32_t uidx = info->groups[0].unitIdxs[0];
|
||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
||||
(*p)[i] = ((colData == NULL) || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
|
||||
p[i] = ((colData == NULL) || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
|
||||
|
||||
if ((*p)[i] == 0) {
|
||||
if (p[i] == 0) {
|
||||
all = false;
|
||||
}
|
||||
}
|
||||
|
||||
return all;
|
||||
}
|
||||
static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||
bool all = true;
|
||||
|
||||
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) {
|
||||
if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
|
||||
return all;
|
||||
}
|
||||
|
||||
if (*p == NULL) {
|
||||
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
|
||||
}
|
||||
int8_t* p = (int8_t*)pRes->pData;
|
||||
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
uint32_t uidx = info->groups[0].unitIdxs[0];
|
||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
||||
|
||||
(*p)[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
|
||||
if ((*p)[i] == 0) {
|
||||
p[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
|
||||
if (p[i] == 0) {
|
||||
all = false;
|
||||
}
|
||||
}
|
||||
|
@ -3127,7 +3120,7 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
|
|||
return all;
|
||||
}
|
||||
|
||||
bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||
bool all = true;
|
||||
uint16_t dataSize = info->cunits[0].dataSize;
|
||||
|
@ -3136,13 +3129,11 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
|
|||
void *valData2 = info->cunits[0].valData2;
|
||||
__compar_fn_t func = gDataCompare[info->cunits[0].func];
|
||||
|
||||
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) {
|
||||
if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
|
||||
return all;
|
||||
}
|
||||
|
||||
if (*p == NULL) {
|
||||
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
|
||||
}
|
||||
int8_t* p = (int8_t*) pRes->pData;
|
||||
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[0].colData, i);
|
||||
|
@ -3152,9 +3143,9 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
|
|||
continue;
|
||||
}
|
||||
|
||||
(*p)[i] = (*rfunc)(colData, colData, valData, valData2, func);
|
||||
p[i] = (*rfunc)(colData, colData, valData, valData2, func);
|
||||
|
||||
if ((*p)[i] == 0) {
|
||||
if (p[i] == 0) {
|
||||
all = false;
|
||||
}
|
||||
}
|
||||
|
@ -3162,23 +3153,21 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
|
|||
return all;
|
||||
}
|
||||
|
||||
bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||
bool all = true;
|
||||
|
||||
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) {
|
||||
if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
|
||||
return all;
|
||||
}
|
||||
|
||||
if (*p == NULL) {
|
||||
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
|
||||
}
|
||||
int8_t* p = (int8_t*) pRes->pData;
|
||||
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
uint32_t uidx = info->groups[0].unitIdxs[0];
|
||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
||||
if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
|
||||
(*p)[i] = 0;
|
||||
p[i] = 0;
|
||||
all = false;
|
||||
continue;
|
||||
}
|
||||
|
@ -3191,14 +3180,14 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
|
|||
qError("castConvert1 taosUcs4ToMbs error");
|
||||
}else{
|
||||
varDataSetLen(newColData, len);
|
||||
(*p)[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, newColData, info->cunits[uidx].valData);
|
||||
p[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, newColData, info->cunits[uidx].valData);
|
||||
}
|
||||
taosMemoryFreeClear(newColData);
|
||||
}else{
|
||||
(*p)[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, colData, info->cunits[uidx].valData);
|
||||
p[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, colData, info->cunits[uidx].valData);
|
||||
}
|
||||
|
||||
if ((*p)[i] == 0) {
|
||||
if (p[i] == 0) {
|
||||
all = false;
|
||||
}
|
||||
}
|
||||
|
@ -3207,17 +3196,15 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
|
|||
}
|
||||
|
||||
|
||||
bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||
bool all = true;
|
||||
|
||||
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) {
|
||||
if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
|
||||
return all;
|
||||
}
|
||||
|
||||
if (*p == NULL) {
|
||||
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
|
||||
}
|
||||
int8_t* p = (int8_t*) pRes->pData;
|
||||
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
//FILTER_UNIT_CLR_F(info);
|
||||
|
@ -3235,14 +3222,14 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
|
|||
uint8_t optr = cunit->optr;
|
||||
|
||||
if (colData == NULL || colDataIsNull((SColumnInfoData *)(cunit->colData), 0, i, NULL)) {
|
||||
(*p)[i] = optr == OP_TYPE_IS_NULL ? true : false;
|
||||
p[i] = optr == OP_TYPE_IS_NULL ? true : false;
|
||||
} else {
|
||||
if (optr == OP_TYPE_IS_NOT_NULL) {
|
||||
(*p)[i] = 1;
|
||||
p[i] = 1;
|
||||
} else if (optr == OP_TYPE_IS_NULL) {
|
||||
(*p)[i] = 0;
|
||||
p[i] = 0;
|
||||
} else if (cunit->rfunc >= 0) {
|
||||
(*p)[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]);
|
||||
p[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]);
|
||||
} else {
|
||||
if(cunit->dataType == TSDB_DATA_TYPE_NCHAR && (cunit->optr == OP_TYPE_MATCH || cunit->optr == OP_TYPE_NMATCH)){
|
||||
char *newColData = taosMemoryCalloc(cunit->dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1);
|
||||
|
@ -3251,11 +3238,11 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
|
|||
qError("castConvert1 taosUcs4ToMbs error");
|
||||
}else{
|
||||
varDataSetLen(newColData, len);
|
||||
(*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, newColData, cunit->valData);
|
||||
p[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, newColData, cunit->valData);
|
||||
}
|
||||
taosMemoryFreeClear(newColData);
|
||||
}else{
|
||||
(*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData);
|
||||
p[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3263,17 +3250,17 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
|
|||
//FILTER_UNIT_SET_F(info, uidx);
|
||||
}
|
||||
|
||||
if ((*p)[i] == 0) {
|
||||
if (p[i] == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ((*p)[i]) {
|
||||
if (p[i]) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ((*p)[i] == 0) {
|
||||
if (p[i] == 0) {
|
||||
all = false;
|
||||
}
|
||||
}
|
||||
|
@ -4026,38 +4013,63 @@ _return:
|
|||
FLT_RET(code);
|
||||
}
|
||||
|
||||
bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||
bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData** p, SColumnDataAgg *statis, int16_t numOfCols, int32_t *pResultStatus) {
|
||||
if (NULL == info) {
|
||||
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
|
||||
return false;
|
||||
}
|
||||
|
||||
SScalarParam output = {0};
|
||||
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
||||
|
||||
int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (info->scalarMode) {
|
||||
SScalarParam output = {0};
|
||||
|
||||
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
||||
int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SArray *pList = taosArrayInit(1, POINTER_BYTES);
|
||||
taosArrayPush(pList, &pSrc);
|
||||
|
||||
FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output));
|
||||
*p = taosMemoryMalloc(output.numOfRows * sizeof(bool));
|
||||
|
||||
memcpy(*p, output.columnData->pData, output.numOfRows);
|
||||
colDataDestroy(output.columnData);
|
||||
taosMemoryFree(output.columnData);
|
||||
*p = output.columnData;
|
||||
|
||||
taosArrayDestroy(pList);
|
||||
|
||||
if (output.numOfQualified == output.numOfRows) {
|
||||
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
|
||||
} else if (output.numOfQualified == 0) {
|
||||
*pResultStatus = FILTER_RESULT_NONE_QUALIFIED;
|
||||
} else {
|
||||
*pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED;
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
*p = output.columnData;
|
||||
output.numOfRows = pSrc->info.rows;
|
||||
|
||||
bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols);
|
||||
|
||||
// todo this should be return during filter procedure
|
||||
int32_t num = 0;
|
||||
for(int32_t i = 0; i < output.numOfRows; ++i) {
|
||||
if (((int8_t*)((*p)->pData))[i] == 1) {
|
||||
++num;
|
||||
}
|
||||
}
|
||||
|
||||
if (num == output.numOfRows) {
|
||||
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
|
||||
} else if (num == 0) {
|
||||
*pResultStatus = FILTER_RESULT_NONE_QUALIFIED;
|
||||
} else {
|
||||
*pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED;
|
||||
}
|
||||
|
||||
return keep;
|
||||
}
|
||||
|
||||
return (*info->func)(info, pSrc->info.rows, p, statis, numOfCols);
|
||||
}
|
||||
|
||||
|
||||
typedef struct SClassifyConditionCxt {
|
||||
bool hasPrimaryKey;
|
||||
bool hasTagIndexCol;
|
||||
|
|
|
@ -1646,38 +1646,60 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut,
|
|||
doReleaseVec(pRightCol, rightConvert);
|
||||
}
|
||||
|
||||
#define VEC_COM_INNER(pCol, index1, index2) \
|
||||
for (; i < pCol->numOfRows && i >= 0; i += step) {\
|
||||
if (IS_HELPER_NULL(pLeft->columnData, index1) || IS_HELPER_NULL(pRight->columnData, index2)) {\
|
||||
bool res = false;\
|
||||
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);\
|
||||
continue;\
|
||||
}\
|
||||
char *pLeftData = colDataGetData(pLeft->columnData, index1);\
|
||||
char *pRightData = colDataGetData(pRight->columnData, index2);\
|
||||
int64_t leftOut = 0;\
|
||||
int64_t rightOut = 0;\
|
||||
bool freeLeft = false;\
|
||||
bool freeRight = false;\
|
||||
bool isJsonnull = false;\
|
||||
bool result = convertJsonValue(&fp, optr, GET_PARAM_TYPE(pLeft), GET_PARAM_TYPE(pRight),\
|
||||
&pLeftData, &pRightData, &leftOut, &rightOut, &isJsonnull, &freeLeft, &freeRight);\
|
||||
if(isJsonnull){\
|
||||
ASSERT(0);\
|
||||
}\
|
||||
if(!pLeftData || !pRightData){\
|
||||
result = false;\
|
||||
}\
|
||||
if(!result){\
|
||||
colDataAppendInt8(pOut->columnData, i, (int8_t*)&result);\
|
||||
}else{\
|
||||
bool res = filterDoCompare(fp, optr, pLeftData, pRightData);\
|
||||
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);\
|
||||
}\
|
||||
if(freeLeft) taosMemoryFreeClear(pLeftData);\
|
||||
if(freeRight) taosMemoryFreeClear(pRightData);\
|
||||
int32_t doVectorCompareImpl(int32_t numOfRows, SScalarParam *pOut, int32_t startIndex, int32_t step, __compar_fn_t fp,
|
||||
SScalarParam *pLeft, SScalarParam *pRight, int32_t optr) {
|
||||
int32_t num = 0;
|
||||
|
||||
for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) {
|
||||
int32_t leftIndex = (i >= pLeft->numOfRows)? 0:i;
|
||||
int32_t rightIndex = (i >= pRight->numOfRows)? 0:i;
|
||||
|
||||
if (IS_HELPER_NULL(pLeft->columnData, leftIndex) || IS_HELPER_NULL(pRight->columnData, rightIndex)) {
|
||||
bool res = false;
|
||||
colDataAppendInt8(pOut->columnData, i, (int8_t *)&res);
|
||||
continue;
|
||||
}
|
||||
|
||||
char * pLeftData = colDataGetData(pLeft->columnData, leftIndex);
|
||||
char * pRightData = colDataGetData(pRight->columnData, rightIndex);
|
||||
int64_t leftOut = 0;
|
||||
int64_t rightOut = 0;
|
||||
bool freeLeft = false;
|
||||
bool freeRight = false;
|
||||
bool isJsonnull = false;
|
||||
|
||||
bool result = convertJsonValue(&fp, optr, GET_PARAM_TYPE(pLeft), GET_PARAM_TYPE(pRight), &pLeftData, &pRightData,
|
||||
&leftOut, &rightOut, &isJsonnull, &freeLeft, &freeRight);
|
||||
if (isJsonnull) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (!pLeftData || !pRightData) {
|
||||
result = false;
|
||||
}
|
||||
|
||||
if (!result) {
|
||||
colDataAppendInt8(pOut->columnData, i, (int8_t *)&result);
|
||||
} else {
|
||||
bool res = filterDoCompare(fp, optr, pLeftData, pRightData);
|
||||
colDataAppendInt8(pOut->columnData, i, (int8_t *)&res);
|
||||
if (res) {
|
||||
++num;
|
||||
}
|
||||
}
|
||||
|
||||
if (freeLeft) {
|
||||
taosMemoryFreeClear(pLeftData);
|
||||
}
|
||||
|
||||
if (freeRight) {
|
||||
taosMemoryFreeClear(pRightData);
|
||||
}
|
||||
}
|
||||
|
||||
return num;
|
||||
}
|
||||
|
||||
void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) {
|
||||
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1;
|
||||
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
|
||||
|
@ -1704,16 +1726,12 @@ void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *
|
|||
char *pLeftData = colDataGetData(pLeft->columnData, i);
|
||||
bool res = filterDoCompare(fp, optr, pLeftData, pRight->pHashFilter);
|
||||
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);
|
||||
if (res) {
|
||||
pOut->numOfQualified++;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (pLeft->numOfRows == pRight->numOfRows) {
|
||||
VEC_COM_INNER(pLeft, i, i)
|
||||
} else if (pRight->numOfRows == 1) {
|
||||
VEC_COM_INNER(pLeft, i, 0)
|
||||
} else if (pLeft->numOfRows == 1) {
|
||||
VEC_COM_INNER(pRight, 0, i)
|
||||
} else { // normal compare
|
||||
pOut->numOfQualified = doVectorCompareImpl(pOut->numOfRows, pOut, i, step, fp, pLeft, pRight, optr);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue