Merge pull request #16970 from taosdata/feature/3_liaohj

refactor(query): do some internal refactor about  the filter  and  interp  query.
This commit is contained in:
Haojun Liao 2022-09-22 17:09:30 +08:00 committed by GitHub
commit aa62d4430b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 307 additions and 236 deletions

View File

@ -78,7 +78,6 @@ enum {
MAIN_SCAN = 0x0u, MAIN_SCAN = 0x0u,
REVERSE_SCAN = 0x1u, // todo remove it REVERSE_SCAN = 0x1u, // todo remove it
REPEAT_SCAN = 0x2u, // repeat scan belongs to the master scan REPEAT_SCAN = 0x2u, // repeat scan belongs to the master scan
MERGE_STAGE = 0x20u,
}; };
typedef struct SPoint1 { typedef struct SPoint1 {
@ -156,11 +155,6 @@ typedef struct SqlFunctionCtx {
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];
} SqlFunctionCtx; } SqlFunctionCtx;
enum {
TEXPR_BINARYEXPR_NODE = 0x1,
TEXPR_UNARYEXPR_NODE = 0x2,
};
typedef struct tExprNode { typedef struct tExprNode {
int32_t nodeType; int32_t nodeType;
union { union {
@ -184,6 +178,7 @@ struct SScalarParam {
int32_t hashValueType; int32_t hashValueType;
void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value
int32_t numOfRows; int32_t numOfRows;
int32_t numOfQualified; // number of qualified elements in the final results
}; };
void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell); void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell);
@ -201,8 +196,6 @@ int32_t taosGetLinearInterpolationVal(SPoint *point, int32_t outputType, SPoint
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// udf api // udf api
struct SUdfInfo;
/** /**
* create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf * create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf
* @return error code * @return error code
@ -226,6 +219,7 @@ int32_t udfStartUdfd(int32_t startDnodeId);
* @return * @return
*/ */
int32_t udfStopUdfd(); int32_t udfStopUdfd();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -31,13 +31,17 @@ enum {
FLT_OPTION_NEED_UNIQE = 4, FLT_OPTION_NEED_UNIQE = 4,
}; };
#define FILTER_RESULT_ALL_QUALIFIED 0x1
#define FILTER_RESULT_NONE_QUALIFIED 0x2
#define FILTER_RESULT_PARTIAL_QUALIFIED 0x3
typedef struct SFilterColumnParam { typedef struct SFilterColumnParam {
int32_t numOfCols; int32_t numOfCols;
SArray *pDataBlock; SArray *pDataBlock;
} SFilterColumnParam; } SFilterColumnParam;
extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options); extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t **p, SColumnDataAgg *statis, int16_t numOfCols); extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData** p, SColumnDataAgg *statis, int16_t numOfCols, int32_t* pFilterResStatus);
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param); extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param); extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);
extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict); extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict);

View File

@ -193,6 +193,8 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t doBuildDataBlock(STsdbReader* pReader); static int32_t doBuildDataBlock(STsdbReader* pReader);
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
@ -893,7 +895,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
if (pData->cid < pColData->info.colId) { if (pData->cid < pColData->info.colId) {
colIndex += 1; colIndex += 1;
} else if (pData->cid == pColData->info.colId) { } else if (pData->cid == pColData->info.colId) {
if (pData->flag == HAS_NONE || pData->flag == HAS_NULL) { if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL|HAS_NONE)) {
colDataAppendNNULL(pColData, 0, remain); colDataAppendNNULL(pColData, 0, remain);
} else { } else {
if (IS_NUMERIC_TYPE(pColData->info.type) && asc) { if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
@ -1537,7 +1539,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = k.ts; minKey = k.ts;
} }
if (minKey > key && pBlockData->nRow > 0) { if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
minKey = key; minKey = key;
} }
} else { } else {
@ -1550,7 +1552,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = k.ts; minKey = k.ts;
} }
if (minKey < key && pBlockData->nRow > 0) { if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
minKey = key; minKey = key;
} }
} }
@ -1688,7 +1690,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
if (pBlockData->nRow > 0) { if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
// no last block available, only data block exists // no last block available, only data block exists
if (!hasDataInLastBlock(pLastBlockReader)) { if (!hasDataInLastBlock(pLastBlockReader)) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
@ -1753,7 +1755,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tsLast = getCurrentKeyInLastBlock(pLastBlockReader); tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
} }
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN;
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow); TSDBKEY ik = TSDBROW_KEY(piRow);
@ -1769,7 +1771,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = ik.ts; minKey = ik.ts;
} }
if (minKey > key && pBlockData->nRow > 0) { if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
minKey = key; minKey = key;
} }
@ -1786,7 +1788,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = ik.ts; minKey = ik.ts;
} }
if (minKey < key && pBlockData->nRow > 0) { if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
minKey = key; minKey = key;
} }
@ -2021,6 +2023,13 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
} }
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; } static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
if (pBlockData->nRow > 0) {
ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
}
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
}
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader) { STsdbReader* pReader) {
@ -2052,7 +2061,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t key = (pBlockData->nRow > 0) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) { if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
} else { } else {
@ -3290,9 +3299,31 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; } uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
// no data in files, let's try buffer in memory
if (pReader->status.fileIter.numOfFiles == 0) {
pReader->status.loadFromFile = false;
return TSDB_CODE_SUCCESS;
} else {
return initForFirstBlockInFile(pReader, pBlockIter);
}
}
// ====================================== EXPOSED APIs ====================================== // ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
const char* idstr) { const char* idstr) {
STimeWindow window = pCond->twindows;
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
pCond->twindows.skey += 1;
pCond->twindows.ekey -= 1;
}
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr); int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _err;
@ -3300,21 +3331,20 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
// check for query time window // check for query time window
STsdbReader* pReader = *ppReader; STsdbReader* pReader = *ppReader;
if (isEmptyQueryTimeWindow(&pReader->window)) { if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr); tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
// update the SQueryTableDataCond to create inner reader // update the SQueryTableDataCond to create inner reader
STimeWindow w = pCond->twindows;
int32_t order = pCond->order; int32_t order = pCond->order;
if (order == TSDB_ORDER_ASC) { if (order == TSDB_ORDER_ASC) {
pCond->twindows.ekey = pCond->twindows.skey; pCond->twindows.ekey = window.skey;
pCond->twindows.skey = INT64_MIN; pCond->twindows.skey = INT64_MIN;
pCond->order = TSDB_ORDER_DESC; pCond->order = TSDB_ORDER_DESC;
} else { } else {
pCond->twindows.skey = pCond->twindows.ekey; pCond->twindows.skey = window.ekey;
pCond->twindows.ekey = INT64_MAX; pCond->twindows.ekey = INT64_MAX;
pCond->order = TSDB_ORDER_ASC; pCond->order = TSDB_ORDER_ASC;
} }
@ -3326,12 +3356,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
} }
if (order == TSDB_ORDER_ASC) { if (order == TSDB_ORDER_ASC) {
pCond->twindows.skey = w.ekey; pCond->twindows.skey = window.ekey;
pCond->twindows.ekey = INT64_MAX; pCond->twindows.ekey = INT64_MAX;
} else { } else {
pCond->twindows.skey = INT64_MIN; pCond->twindows.skey = INT64_MIN;
pCond->twindows.ekey = w.ekey; pCond->twindows.ekey = window.ekey;
} }
pCond->order = order;
code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr); code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _err;
@ -3340,20 +3372,22 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
// NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here. // NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
if (pCond->suid != 0) { if (pCond->suid != 0) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, /*pCond->endVersion*/ -1); pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, suid:%"PRIu64", ver:%"PRId64" , %s", pReader->suid, -1, pReader->idStr); tsdbError("failed to get table schema, suid:%"PRIu64", ver:%"PRId64" , %s", pReader->suid, -1, pReader->idStr);
} }
} else if (taosArrayGetSize(pTableList) > 0) { } else if (taosArrayGetSize(pTableList) > 0) {
STableKeyInfo* pKey = taosArrayGet(pTableList, 0); STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, /*pCond->endVersion*/ -1); pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1);
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, uid:%"PRIu64", ver:%"PRId64" , %s", pKey->uid, -1, pReader->idStr); tsdbError("failed to get table schema, uid:%"PRIu64", ver:%"PRId64" , %s", pKey->uid, -1, pReader->idStr);
} }
} }
STsdbReader* p = pReader->innerReader[0] != NULL? pReader->innerReader[0]:pReader;
int32_t numOfTables = taosArrayGetSize(pTableList); int32_t numOfTables = taosArrayGetSize(pTableList);
pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables); pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList->pData, numOfTables);
if (pReader->status.pTableMap == NULL) { if (pReader->status.pTableMap == NULL) {
tsdbReaderClose(pReader); tsdbReaderClose(pReader);
*ppReader = NULL; *ppReader = NULL;
@ -3368,40 +3402,36 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
} }
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
SDataBlockIter* pBlockIter = &pReader->status.blockIter; code = doOpenReaderImpl(pReader);
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
// no data in files, let's try buffer in memory
if (pReader->status.fileIter.numOfFiles == 0) {
pReader->status.loadFromFile = false;
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
}
} else { } else {
STsdbReader* pPrevReader = pReader->innerReader[0]; STsdbReader* pPrevReader = pReader->innerReader[0];
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter; STsdbReader* pNextReader = pReader->innerReader[1];
code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap, pReader->idStr); // we need only one row
if (code != TSDB_CODE_SUCCESS) { pPrevReader->capacity = 1;
goto _err; pPrevReader->status.pTableMap = pReader->status.pTableMap;
} pPrevReader->pReadSnap = pReader->pReadSnap;
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader); pNextReader->capacity = 1;
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order); pNextReader->status.pTableMap = pReader->status.pTableMap;
pNextReader->pReadSnap = pReader->pReadSnap;
// no data in files, let's try buffer in memory code = doOpenReaderImpl(pPrevReader);
if (pPrevReader->status.fileIter.numOfFiles == 0) {
pPrevReader->status.loadFromFile = false;
} else {
code = initForFirstBlockInFile(pPrevReader, pBlockIter);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
code = doOpenReaderImpl(pNextReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doOpenReaderImpl(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
} }
} }
@ -3418,6 +3448,19 @@ void tsdbReaderClose(STsdbReader* pReader) {
return; return;
} }
{
if (pReader->innerReader[0] != NULL) {
pReader->innerReader[0]->status.pTableMap = NULL;
pReader->innerReader[0]->pReadSnap = NULL;
pReader->innerReader[1]->status.pTableMap = NULL;
pReader->innerReader[1]->pReadSnap = NULL;
tsdbReaderClose(pReader->innerReader[0]);
tsdbReaderClose(pReader->innerReader[1]);
}
}
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
taosMemoryFreeClear(pSupInfo->plist); taosMemoryFreeClear(pSupInfo->plist);
@ -3508,32 +3551,32 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
return false; return false;
} }
if (pReader->innerReader[0] != NULL) { if (pReader->innerReader[0] != NULL && pReader->step == 0) {
bool ret = doTsdbNextDataBlock(pReader->innerReader[0]); bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
if (ret) { resetDataBlockScanInfo(pReader->innerReader[0]->status.pTableMap, pReader->innerReader[0]->window.ekey);
pReader->step = EXTERNAL_ROWS_PREV; pReader->step = EXTERNAL_ROWS_PREV;
if (ret) {
return ret; return ret;
} }
tsdbReaderClose(pReader->innerReader[0]);
pReader->innerReader[0] = NULL;
} }
if (pReader->step == EXTERNAL_ROWS_PREV) {
pReader->step = EXTERNAL_ROWS_MAIN; pReader->step = EXTERNAL_ROWS_MAIN;
}
bool ret = doTsdbNextDataBlock(pReader); bool ret = doTsdbNextDataBlock(pReader);
if (ret) { if (ret) {
return ret; return ret;
} }
if (pReader->innerReader[1] != NULL) { if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]); bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
if (ret1) {
pReader->step = EXTERNAL_ROWS_NEXT; pReader->step = EXTERNAL_ROWS_NEXT;
if (ret1) {
return ret1; return ret1;
} }
tsdbReaderClose(pReader->innerReader[1]);
pReader->innerReader[1] = NULL;
} }
return false; return false;

View File

@ -81,11 +81,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }
static int32_t getExprFunctionId(SExprInfo* pExprInfo) {
assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
return 0;
}
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock); static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
static void releaseQueryBuf(size_t numOfTables); static void releaseQueryBuf(size_t numOfTables);
@ -1115,7 +1110,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
} }
} }
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep); static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) { void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) {
if (pFilterNode == NULL || pBlock->info.rows == 0) { if (pFilterNode == NULL || pBlock->info.rows == 0) {
@ -1126,18 +1121,17 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
// todo move to the initialization function // todo move to the initialization function
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1); code = filterSetDataFromSlotId(filter, &param1);
int8_t* rowRes = NULL; SColumnInfoData* p = NULL;
int32_t status = 0;
// todo the keep seems never to be True?? // todo the keep seems never to be True??
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); bool keep = filterExecute(filter, pBlock, &p, NULL, param1.numOfCols, &status);
filterFreeInfo(filter); filterFreeInfo(filter);
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep); extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
if (pColMatchInfo != NULL) { if (pColMatchInfo != NULL) {
for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) {
@ -1152,16 +1146,22 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
} }
} }
taosMemoryFree(rowRes); colDataDestroy(p);
taosMemoryFree(p);
} }
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) { void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
if (keep) { if (keep) {
return; return;
} }
if (rowRes != NULL) {
int32_t totalRows = pBlock->info.rows; int32_t totalRows = pBlock->info.rows;
if (status == FILTER_RESULT_ALL_QUALIFIED) {
// here nothing needs to be done
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
pBlock->info.rows = 0;
} else {
SSDataBlock* px = createOneDataBlock(pBlock, true); SSDataBlock* px = createOneDataBlock(pBlock, true);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
@ -1177,7 +1177,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
int32_t numOfRows = 0; int32_t numOfRows = 0;
for (int32_t j = 0; j < totalRows; ++j) { for (int32_t j = 0; j < totalRows; ++j) {
if (rowRes[j] == 0) { if (((int8_t*)p->pData)[j] == 0) {
continue; continue;
} }
@ -1189,6 +1189,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
numOfRows += 1; numOfRows += 1;
} }
// todo this value can be assigned directly
if (pBlock->info.rows == totalRows) { if (pBlock->info.rows == totalRows) {
pBlock->info.rows = numOfRows; pBlock->info.rows = numOfRows;
} else { } else {
@ -1197,9 +1198,6 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
} }
blockDataDestroy(px); // fix memory leak blockDataDestroy(px); // fix memory leak
} else {
// do nothing
pBlock->info.rows = 0;
} }
} }
@ -4246,10 +4244,10 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (pCtx[j].fpSet.finalize) { if (pCtx[j].fpSet.finalize) {
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code)) { if (TAOS_FAILED(code1)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code1);
} }
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor // do nothing, todo refactor

View File

@ -2075,7 +2075,7 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
} }
} }
pSliceInfo->fillLastPoint = isLastRow ? true : false; pSliceInfo->fillLastPoint = isLastRow;
} }
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) { static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) {
@ -2294,15 +2294,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SSDataBlock* pResBlock = pSliceInfo->pRes; SSDataBlock* pResBlock = pSliceInfo->pRes;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
// if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hasRemainResults(&pSliceInfo->groupResInfo)) {
// doSetOperatorCompleted(pOperator);
// }
//
// return pResBlock;
// }
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
SInterval* pInterval = &pSliceInfo->interval; SInterval* pInterval = &pSliceInfo->interval;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
@ -2432,6 +2423,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
break; break;
} }
} }
} else {
// store ts value as start, and calculate interp value when processing next block
doKeepLinearInfo(pSliceInfo, pBlock, i, true);
} }
} else { // non-linear interpolation } else { // non-linear interpolation
if (i < pBlock->info.rows - 1) { if (i < pBlock->info.rows - 1) {
@ -2510,6 +2504,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
break; break;
} }
} }
} else { // it is the last row of current block
// store ts value as start, and calculate interp value when processing next block
doKeepLinearInfo(pSliceInfo, pBlock, i, true);
} }
} else { // non-linear interpolation } else { // non-linear interpolation
pSliceInfo->current = pSliceInfo->current =
@ -2615,6 +2612,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->interval.interval = pInterpPhyNode->interval; pInfo->interval.interval = pInterpPhyNode->interval;
pInfo->current = pInfo->win.skey; pInfo->current = pInfo->win.skey;
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
pScanInfo->cond.twindows = pInfo->win;
pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL;
pOperator->name = "TimeSliceOperator"; pOperator->name = "TimeSliceOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC;
pOperator->blocking = false; pOperator->blocking = false;

View File

@ -26,11 +26,6 @@ typedef struct SFuncMgtService {
SHashObj* pFuncNameHashTable; SHashObj* pFuncNameHashTable;
} SFuncMgtService; } SFuncMgtService;
typedef struct SUdfInfo {
SDataType outputDt;
int8_t funcType;
} SUdfInfo;
static SFuncMgtService gFunMgtService; static SFuncMgtService gFunMgtService;
static TdThreadOnce functionHashTableInit = PTHREAD_ONCE_INIT; static TdThreadOnce functionHashTableInit = PTHREAD_ONCE_INIT;
static int32_t initFunctionCode = 0; static int32_t initFunctionCode = 0;

View File

@ -99,7 +99,7 @@ typedef struct SFilterRange {
typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t); typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t);
typedef int32_t(*filter_desc_compare_func)(const void *, const void *); typedef int32_t(*filter_desc_compare_func)(const void *, const void *);
typedef bool(*filter_exec_func)(void *, int32_t, int8_t**, SColumnDataAgg *, int16_t); typedef bool(*filter_exec_func)(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols);
typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char*, void **); typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char*, void **);
typedef struct SFilterRangeCompare { typedef struct SFilterRangeCompare {

View File

@ -2976,14 +2976,12 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
SFilterInfo *info = (SFilterInfo *)pinfo; SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true; bool all = true;
uint32_t *unitIdx = NULL; uint32_t *unitIdx = NULL;
if (*p == NULL) { int8_t* p = (int8_t*)pRes->pData;
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
}
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
//FILTER_UNIT_CLR_F(info); //FILTER_UNIT_CLR_F(info);
@ -3002,35 +3000,35 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p,
uint8_t optr = cunit->optr; uint8_t optr = cunit->optr;
if (colDataIsNull((SColumnInfoData *)(cunit->colData), 0, i, NULL)) { if (colDataIsNull((SColumnInfoData *)(cunit->colData), 0, i, NULL)) {
(*p)[i] = optr == OP_TYPE_IS_NULL ? true : false; p[i] = (optr == OP_TYPE_IS_NULL) ? true : false;
} else { } else {
if (optr == OP_TYPE_IS_NOT_NULL) { if (optr == OP_TYPE_IS_NOT_NULL) {
(*p)[i] = 1; p[i] = 1;
} else if (optr == OP_TYPE_IS_NULL) { } else if (optr == OP_TYPE_IS_NULL) {
(*p)[i] = 0; p[i] = 0;
} else if (cunit->rfunc >= 0) { } else if (cunit->rfunc >= 0) {
(*p)[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]); p[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]);
} else { } else {
(*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData); p[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData);
} }
//FILTER_UNIT_SET_R(info, uidx, p[i]); //FILTER_UNIT_SET_R(info, uidx, p[i]);
//FILTER_UNIT_SET_F(info, uidx); //FILTER_UNIT_SET_F(info, uidx);
} }
if ((*p)[i] == 0) { if (p[i] == 0) {
break; break;
} }
} }
if ((*p)[i]) { if (p[i]) {
break; break;
} }
unitIdx += unitNum; unitIdx += unitNum;
} }
if ((*p)[i] == 0) { if (p[i] == 0) {
all = false; all = false;
} }
} }
@ -3040,7 +3038,7 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p,
int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols, bool* all) { int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols, bool* all) {
if (statis && numOfRows >= FILTER_RM_UNIT_MIN_ROWS) { if (statis && numOfRows >= FILTER_RM_UNIT_MIN_ROWS) {
info->blkFlag = 0; info->blkFlag = 0;
@ -3058,7 +3056,6 @@ int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, int8_t*
assert(info->unitNum > 1); assert(info->unitNum > 1);
*all = filterExecuteBasedOnStatisImpl(info, numOfRows, p, statis, numOfCols); *all = filterExecuteBasedOnStatisImpl(info, numOfRows, p, statis, numOfCols);
goto _return; goto _return;
} }
} }
@ -3067,59 +3064,55 @@ int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, int8_t*
_return: _return:
info->blkFlag = 0; info->blkFlag = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) {
static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
return true; return true;
} }
static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) {
return false; return false;
} }
static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
SFilterInfo *info = (SFilterInfo *)pinfo; SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true; bool all = true;
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) { int8_t* p = (int8_t*)pRes->pData;
return all;
}
if (*p == NULL) { if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t)); return all;
} }
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0]; uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
(*p)[i] = ((colData == NULL) || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL)); p[i] = ((colData == NULL) || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
if ((*p)[i] == 0) { if (p[i] == 0) {
all = false; all = false;
} }
} }
return all; return all;
} }
static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
SFilterInfo *info = (SFilterInfo *)pinfo; SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true; bool all = true;
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) { if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
return all; return all;
} }
if (*p == NULL) { int8_t* p = (int8_t*)pRes->pData;
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
}
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0]; uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
(*p)[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL)); p[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
if ((*p)[i] == 0) { if (p[i] == 0) {
all = false; all = false;
} }
} }
@ -3127,7 +3120,7 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
return all; return all;
} }
bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
SFilterInfo *info = (SFilterInfo *)pinfo; SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true; bool all = true;
uint16_t dataSize = info->cunits[0].dataSize; uint16_t dataSize = info->cunits[0].dataSize;
@ -3136,13 +3129,11 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
void *valData2 = info->cunits[0].valData2; void *valData2 = info->cunits[0].valData2;
__compar_fn_t func = gDataCompare[info->cunits[0].func]; __compar_fn_t func = gDataCompare[info->cunits[0].func];
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) { if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
return all; return all;
} }
if (*p == NULL) { int8_t* p = (int8_t*) pRes->pData;
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
}
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
void *colData = colDataGetData((SColumnInfoData *)info->cunits[0].colData, i); void *colData = colDataGetData((SColumnInfoData *)info->cunits[0].colData, i);
@ -3152,9 +3143,9 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
continue; continue;
} }
(*p)[i] = (*rfunc)(colData, colData, valData, valData2, func); p[i] = (*rfunc)(colData, colData, valData, valData2, func);
if ((*p)[i] == 0) { if (p[i] == 0) {
all = false; all = false;
} }
} }
@ -3162,23 +3153,21 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
return all; return all;
} }
bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
SFilterInfo *info = (SFilterInfo *)pinfo; SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true; bool all = true;
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) { if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
return all; return all;
} }
if (*p == NULL) { int8_t* p = (int8_t*) pRes->pData;
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
}
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0]; uint32_t uidx = info->groups[0].unitIdxs[0];
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) { if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
(*p)[i] = 0; p[i] = 0;
all = false; all = false;
continue; continue;
} }
@ -3191,14 +3180,14 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
qError("castConvert1 taosUcs4ToMbs error"); qError("castConvert1 taosUcs4ToMbs error");
}else{ }else{
varDataSetLen(newColData, len); varDataSetLen(newColData, len);
(*p)[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, newColData, info->cunits[uidx].valData); p[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, newColData, info->cunits[uidx].valData);
} }
taosMemoryFreeClear(newColData); taosMemoryFreeClear(newColData);
}else{ }else{
(*p)[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, colData, info->cunits[uidx].valData); p[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, colData, info->cunits[uidx].valData);
} }
if ((*p)[i] == 0) { if (p[i] == 0) {
all = false; all = false;
} }
} }
@ -3207,17 +3196,15 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
} }
bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
SFilterInfo *info = (SFilterInfo *)pinfo; SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true; bool all = true;
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) { if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
return all; return all;
} }
if (*p == NULL) { int8_t* p = (int8_t*) pRes->pData;
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
}
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
//FILTER_UNIT_CLR_F(info); //FILTER_UNIT_CLR_F(info);
@ -3235,14 +3222,14 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
uint8_t optr = cunit->optr; uint8_t optr = cunit->optr;
if (colData == NULL || colDataIsNull((SColumnInfoData *)(cunit->colData), 0, i, NULL)) { if (colData == NULL || colDataIsNull((SColumnInfoData *)(cunit->colData), 0, i, NULL)) {
(*p)[i] = optr == OP_TYPE_IS_NULL ? true : false; p[i] = optr == OP_TYPE_IS_NULL ? true : false;
} else { } else {
if (optr == OP_TYPE_IS_NOT_NULL) { if (optr == OP_TYPE_IS_NOT_NULL) {
(*p)[i] = 1; p[i] = 1;
} else if (optr == OP_TYPE_IS_NULL) { } else if (optr == OP_TYPE_IS_NULL) {
(*p)[i] = 0; p[i] = 0;
} else if (cunit->rfunc >= 0) { } else if (cunit->rfunc >= 0) {
(*p)[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]); p[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]);
} else { } else {
if(cunit->dataType == TSDB_DATA_TYPE_NCHAR && (cunit->optr == OP_TYPE_MATCH || cunit->optr == OP_TYPE_NMATCH)){ if(cunit->dataType == TSDB_DATA_TYPE_NCHAR && (cunit->optr == OP_TYPE_MATCH || cunit->optr == OP_TYPE_NMATCH)){
char *newColData = taosMemoryCalloc(cunit->dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1); char *newColData = taosMemoryCalloc(cunit->dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1);
@ -3251,11 +3238,11 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
qError("castConvert1 taosUcs4ToMbs error"); qError("castConvert1 taosUcs4ToMbs error");
}else{ }else{
varDataSetLen(newColData, len); varDataSetLen(newColData, len);
(*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, newColData, cunit->valData); p[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, newColData, cunit->valData);
} }
taosMemoryFreeClear(newColData); taosMemoryFreeClear(newColData);
}else{ }else{
(*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData); p[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData);
} }
} }
@ -3263,17 +3250,17 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
//FILTER_UNIT_SET_F(info, uidx); //FILTER_UNIT_SET_F(info, uidx);
} }
if ((*p)[i] == 0) { if (p[i] == 0) {
break; break;
} }
} }
if ((*p)[i]) { if (p[i]) {
break; break;
} }
} }
if ((*p)[i] == 0) { if (p[i] == 0) {
all = false; all = false;
} }
} }
@ -4026,37 +4013,62 @@ _return:
FLT_RET(code); FLT_RET(code);
} }
bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData** p, SColumnDataAgg *statis, int16_t numOfCols, int32_t *pResultStatus) {
if (NULL == info) { if (NULL == info) {
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
return false;
}
SScalarParam output = {0};
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output);
if (code != TSDB_CODE_SUCCESS) {
return false; return false;
} }
if (info->scalarMode) { if (info->scalarMode) {
SScalarParam output = {0};
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SArray *pList = taosArrayInit(1, POINTER_BYTES); SArray *pList = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(pList, &pSrc); taosArrayPush(pList, &pSrc);
FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output)); FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output));
*p = taosMemoryMalloc(output.numOfRows * sizeof(bool)); *p = output.columnData;
memcpy(*p, output.columnData->pData, output.numOfRows);
colDataDestroy(output.columnData);
taosMemoryFree(output.columnData);
taosArrayDestroy(pList); taosArrayDestroy(pList);
if (output.numOfQualified == output.numOfRows) {
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
} else if (output.numOfQualified == 0) {
*pResultStatus = FILTER_RESULT_NONE_QUALIFIED;
} else {
*pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED;
}
return false; return false;
} else {
*p = output.columnData;
output.numOfRows = pSrc->info.rows;
bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols);
// todo this should be return during filter procedure
int32_t num = 0;
for(int32_t i = 0; i < output.numOfRows; ++i) {
if (((int8_t*)((*p)->pData))[i] == 1) {
++num;
}
} }
return (*info->func)(info, pSrc->info.rows, p, statis, numOfCols); if (num == output.numOfRows) {
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
} else if (num == 0) {
*pResultStatus = FILTER_RESULT_NONE_QUALIFIED;
} else {
*pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED;
} }
return keep;
}
}
typedef struct SClassifyConditionCxt { typedef struct SClassifyConditionCxt {
bool hasPrimaryKey; bool hasPrimaryKey;

View File

@ -606,6 +606,8 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
SCL_ERR_JRET(code); SCL_ERR_JRET(code);
} }
int32_t numOfQualified = 0;
bool value = false; bool value = false;
bool complete = true; bool complete = true;
for (int32_t i = 0; i < rowNum; ++i) { for (int32_t i = 0; i < rowNum; ++i) {
@ -631,6 +633,9 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
if (complete) { if (complete) {
colDataAppend(output->columnData, i, (char*) &value, false); colDataAppend(output->columnData, i, (char*) &value, false);
if (value) {
numOfQualified++;
}
} }
} }
@ -639,8 +644,9 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
output->numOfRows = 0; output->numOfRows = 0;
} }
_return: output->numOfQualified = numOfQualified;
_return:
sclFreeParamList(params, paramNum); sclFreeParamList(params, paramNum);
SCL_RET(code); SCL_RET(code);
} }
@ -1242,6 +1248,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows); colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows);
colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL); colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL);
pDst->numOfRows = res->numOfRows; pDst->numOfRows = res->numOfRows;
pDst->numOfQualified = res->numOfQualified;
} }
sclFreeParam(res); sclFreeParam(res);
@ -1249,7 +1256,6 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
} }
_return: _return:
//nodesDestroyNode(pNode);
sclFreeRes(ctx.pRes); sclFreeRes(ctx.pRes);
return code; return code;
} }

View File

@ -1475,19 +1475,19 @@ void vectorMathMinus(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pO
void vectorAssign(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) { void vectorAssign(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
SColumnInfoData *pOutputCol = pOut->columnData; SColumnInfoData *pOutputCol = pOut->columnData;
pOut->numOfRows = pLeft->numOfRows; pOut->numOfRows = pLeft->numOfRows;
// if (IS_HELPER_NULL(pRight->columnData, 0)) {
if(colDataIsNull_s(pRight->columnData, 0)){ if(colDataIsNull_s(pRight->columnData, 0)){
for (int32_t i = 0; i < pOut->numOfRows; ++i) { colDataAppendNNULL(pOutputCol, 0, pOut->numOfRows);
colDataAppend(pOutputCol, i, NULL, true);
}
} else { } else {
char* d = colDataGetData(pRight->columnData, 0);
for (int32_t i = 0; i < pOut->numOfRows; ++i) { for (int32_t i = 0; i < pOut->numOfRows; ++i) {
colDataAppend(pOutputCol, i, colDataGetData(pRight->columnData, 0), false); colDataAppend(pOutputCol, i, d, false);
} }
} }
ASSERT(pRight->numOfQualified == 1 || pRight->numOfQualified == 0);
pOut->numOfQualified = pRight->numOfQualified * pOut->numOfRows;
} }
void vectorConcat(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) { void vectorConcat(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
@ -1646,36 +1646,58 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut,
doReleaseVec(pRightCol, rightConvert); doReleaseVec(pRightCol, rightConvert);
} }
#define VEC_COM_INNER(pCol, index1, index2) \ int32_t doVectorCompareImpl(int32_t numOfRows, SScalarParam *pOut, int32_t startIndex, int32_t step, __compar_fn_t fp,
for (; i < pCol->numOfRows && i >= 0; i += step) {\ SScalarParam *pLeft, SScalarParam *pRight, int32_t optr) {
if (IS_HELPER_NULL(pLeft->columnData, index1) || IS_HELPER_NULL(pRight->columnData, index2)) {\ int32_t num = 0;
bool res = false;\
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);\ for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) {
continue;\ int32_t leftIndex = (i >= pLeft->numOfRows)? 0:i;
}\ int32_t rightIndex = (i >= pRight->numOfRows)? 0:i;
char *pLeftData = colDataGetData(pLeft->columnData, index1);\
char *pRightData = colDataGetData(pRight->columnData, index2);\ if (IS_HELPER_NULL(pLeft->columnData, leftIndex) || IS_HELPER_NULL(pRight->columnData, rightIndex)) {
int64_t leftOut = 0;\ bool res = false;
int64_t rightOut = 0;\ colDataAppendInt8(pOut->columnData, i, (int8_t *)&res);
bool freeLeft = false;\ continue;
bool freeRight = false;\ }
bool isJsonnull = false;\
bool result = convertJsonValue(&fp, optr, GET_PARAM_TYPE(pLeft), GET_PARAM_TYPE(pRight),\ char * pLeftData = colDataGetData(pLeft->columnData, leftIndex);
&pLeftData, &pRightData, &leftOut, &rightOut, &isJsonnull, &freeLeft, &freeRight);\ char * pRightData = colDataGetData(pRight->columnData, rightIndex);
if(isJsonnull){\ int64_t leftOut = 0;
ASSERT(0);\ int64_t rightOut = 0;
}\ bool freeLeft = false;
if(!pLeftData || !pRightData){\ bool freeRight = false;
result = false;\ bool isJsonnull = false;
}\
if(!result){\ bool result = convertJsonValue(&fp, optr, GET_PARAM_TYPE(pLeft), GET_PARAM_TYPE(pRight), &pLeftData, &pRightData,
colDataAppendInt8(pOut->columnData, i, (int8_t*)&result);\ &leftOut, &rightOut, &isJsonnull, &freeLeft, &freeRight);
}else{\ if (isJsonnull) {
bool res = filterDoCompare(fp, optr, pLeftData, pRightData);\ ASSERT(0);
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);\ }
}\
if(freeLeft) taosMemoryFreeClear(pLeftData);\ if (!pLeftData || !pRightData) {
if(freeRight) taosMemoryFreeClear(pRightData);\ result = false;
}
if (!result) {
colDataAppendInt8(pOut->columnData, i, (int8_t *)&result);
} else {
bool res = filterDoCompare(fp, optr, pLeftData, pRightData);
colDataAppendInt8(pOut->columnData, i, (int8_t *)&res);
if (res) {
++num;
}
}
if (freeLeft) {
taosMemoryFreeClear(pLeftData);
}
if (freeRight) {
taosMemoryFreeClear(pRightData);
}
}
return num;
} }
void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) { void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) {
@ -1704,16 +1726,12 @@ void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *
char *pLeftData = colDataGetData(pLeft->columnData, i); char *pLeftData = colDataGetData(pLeft->columnData, i);
bool res = filterDoCompare(fp, optr, pLeftData, pRight->pHashFilter); bool res = filterDoCompare(fp, optr, pLeftData, pRight->pHashFilter);
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res); colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);
if (res) {
pOut->numOfQualified++;
} }
return;
} }
} else { // normal compare
if (pLeft->numOfRows == pRight->numOfRows) { pOut->numOfQualified = doVectorCompareImpl(pOut->numOfRows, pOut, i, step, fp, pLeft, pRight, optr);
VEC_COM_INNER(pLeft, i, i)
} else if (pRight->numOfRows == 1) {
VEC_COM_INNER(pLeft, i, 0)
} else if (pLeft->numOfRows == 1) {
VEC_COM_INNER(pRight, 0, i)
} }
} }

View File

@ -595,11 +595,11 @@ class TDTestCase:
tdSql.checkData(2, i, 15) tdSql.checkData(2, i, 15)
tdSql.query(f"select interp(c0),interp(c1),interp(c2),interp(c3) from {dbname}.{tbname} range('2020-02-09 00:00:05', '2020-02-13 00:00:05') every(1d) fill(linear)") tdSql.query(f"select interp(c0),interp(c1),interp(c2),interp(c3) from {dbname}.{tbname} range('2020-02-09 00:00:05', '2020-02-13 00:00:05') every(1d) fill(linear)")
tdSql.checkRows(1) tdSql.checkRows(3)
tdSql.checkCols(4) tdSql.checkCols(4)
for i in range (tdSql.queryCols): for i in range (tdSql.queryCols):
tdSql.checkData(0, i, 15) tdSql.checkData(0, i, 13)
tdLog.printNoPrefix("==========step10:test error cases") tdLog.printNoPrefix("==========step10:test error cases")