Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/stream_compression
This commit is contained in:
commit
11d41db7f8
|
@ -2,7 +2,7 @@
|
||||||
# taosadapter
|
# taosadapter
|
||||||
ExternalProject_Add(taosadapter
|
ExternalProject_Add(taosadapter
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
|
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
|
||||||
GIT_TAG 05fb2ff
|
GIT_TAG be729ab
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -78,7 +78,6 @@ enum {
|
||||||
MAIN_SCAN = 0x0u,
|
MAIN_SCAN = 0x0u,
|
||||||
REVERSE_SCAN = 0x1u, // todo remove it
|
REVERSE_SCAN = 0x1u, // todo remove it
|
||||||
REPEAT_SCAN = 0x2u, // repeat scan belongs to the master scan
|
REPEAT_SCAN = 0x2u, // repeat scan belongs to the master scan
|
||||||
MERGE_STAGE = 0x20u,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SPoint1 {
|
typedef struct SPoint1 {
|
||||||
|
@ -156,11 +155,6 @@ typedef struct SqlFunctionCtx {
|
||||||
char udfName[TSDB_FUNC_NAME_LEN];
|
char udfName[TSDB_FUNC_NAME_LEN];
|
||||||
} SqlFunctionCtx;
|
} SqlFunctionCtx;
|
||||||
|
|
||||||
enum {
|
|
||||||
TEXPR_BINARYEXPR_NODE = 0x1,
|
|
||||||
TEXPR_UNARYEXPR_NODE = 0x2,
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct tExprNode {
|
typedef struct tExprNode {
|
||||||
int32_t nodeType;
|
int32_t nodeType;
|
||||||
union {
|
union {
|
||||||
|
@ -184,6 +178,7 @@ struct SScalarParam {
|
||||||
int32_t hashValueType;
|
int32_t hashValueType;
|
||||||
void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value
|
void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value
|
||||||
int32_t numOfRows;
|
int32_t numOfRows;
|
||||||
|
int32_t numOfQualified; // number of qualified elements in the final results
|
||||||
};
|
};
|
||||||
|
|
||||||
void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell);
|
void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell);
|
||||||
|
@ -201,8 +196,6 @@ int32_t taosGetLinearInterpolationVal(SPoint *point, int32_t outputType, SPoint
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// udf api
|
// udf api
|
||||||
struct SUdfInfo;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf
|
* create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf
|
||||||
* @return error code
|
* @return error code
|
||||||
|
@ -226,6 +219,7 @@ int32_t udfStartUdfd(int32_t startDnodeId);
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t udfStopUdfd();
|
int32_t udfStopUdfd();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -31,13 +31,17 @@ enum {
|
||||||
FLT_OPTION_NEED_UNIQE = 4,
|
FLT_OPTION_NEED_UNIQE = 4,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#define FILTER_RESULT_ALL_QUALIFIED 0x1
|
||||||
|
#define FILTER_RESULT_NONE_QUALIFIED 0x2
|
||||||
|
#define FILTER_RESULT_PARTIAL_QUALIFIED 0x3
|
||||||
|
|
||||||
typedef struct SFilterColumnParam {
|
typedef struct SFilterColumnParam {
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
SArray *pDataBlock;
|
SArray *pDataBlock;
|
||||||
} SFilterColumnParam;
|
} SFilterColumnParam;
|
||||||
|
|
||||||
extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options);
|
extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options);
|
||||||
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t **p, SColumnDataAgg *statis, int16_t numOfCols);
|
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData** p, SColumnDataAgg *statis, int16_t numOfCols, int32_t* pFilterResStatus);
|
||||||
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
|
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
|
||||||
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);
|
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);
|
||||||
extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict);
|
extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict);
|
||||||
|
|
|
@ -251,6 +251,7 @@ typedef struct SRequestObj {
|
||||||
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
|
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
|
||||||
uint32_t retry;
|
uint32_t retry;
|
||||||
int64_t allocatorRefId;
|
int64_t allocatorRefId;
|
||||||
|
SQuery* pQuery;
|
||||||
} SRequestObj;
|
} SRequestObj;
|
||||||
|
|
||||||
typedef struct SSyncQueryParam {
|
typedef struct SSyncQueryParam {
|
||||||
|
|
|
@ -350,6 +350,7 @@ void doDestroyRequest(void *p) {
|
||||||
taosArrayDestroy(pRequest->tableList);
|
taosArrayDestroy(pRequest->tableList);
|
||||||
taosArrayDestroy(pRequest->dbList);
|
taosArrayDestroy(pRequest->dbList);
|
||||||
taosArrayDestroy(pRequest->targetTableList);
|
taosArrayDestroy(pRequest->targetTableList);
|
||||||
|
qDestroyQuery(pRequest->pQuery);
|
||||||
nodesDestroyAllocator(pRequest->allocatorRefId);
|
nodesDestroyAllocator(pRequest->allocatorRefId);
|
||||||
|
|
||||||
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
|
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
|
||||||
|
|
|
@ -274,7 +274,6 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
|
||||||
SClientHbBatchRsp pRsp = {0};
|
SClientHbBatchRsp pRsp = {0};
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
|
tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
|
||||||
}
|
|
||||||
|
|
||||||
int32_t now = taosGetTimestampSec();
|
int32_t now = taosGetTimestampSec();
|
||||||
int32_t delta = abs(now - pRsp.svrTimestamp);
|
int32_t delta = abs(now - pRsp.svrTimestamp);
|
||||||
|
@ -282,6 +281,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
|
||||||
code = TSDB_CODE_TIME_UNSYNCED;
|
code = TSDB_CODE_TIME_UNSYNCED;
|
||||||
tscError("time diff: %ds is too big", delta);
|
tscError("time diff: %ds is too big", delta);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t rspNum = taosArrayGetSize(pRsp.rsps);
|
int32_t rspNum = taosArrayGetSize(pRsp.rsps);
|
||||||
|
|
||||||
|
|
|
@ -670,7 +670,6 @@ typedef struct SqlParseWrapper {
|
||||||
SParseContext *pCtx;
|
SParseContext *pCtx;
|
||||||
SCatalogReq catalogReq;
|
SCatalogReq catalogReq;
|
||||||
SRequestObj *pRequest;
|
SRequestObj *pRequest;
|
||||||
SQuery *pQuery;
|
|
||||||
} SqlParseWrapper;
|
} SqlParseWrapper;
|
||||||
|
|
||||||
static void destoryTablesReq(void *p) {
|
static void destoryTablesReq(void *p) {
|
||||||
|
@ -696,8 +695,8 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
|
||||||
|
|
||||||
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||||
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
|
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
|
||||||
SQuery *pQuery = pWrapper->pQuery;
|
|
||||||
SRequestObj *pRequest = pWrapper->pRequest;
|
SRequestObj *pRequest = pWrapper->pRequest;
|
||||||
|
SQuery *pQuery = pRequest->pQuery;
|
||||||
|
|
||||||
pRequest->metric.ctgEnd = taosGetTimestampUs();
|
pRequest->metric.ctgEnd = taosGetTimestampUs();
|
||||||
|
|
||||||
|
@ -726,10 +725,10 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||||
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self,
|
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self,
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
launchAsyncQuery(pRequest, pQuery, pResultMeta);
|
launchAsyncQuery(pRequest, pQuery, pResultMeta);
|
||||||
qDestroyQuery(pQuery);
|
|
||||||
} else {
|
} else {
|
||||||
destorySqlParseWrapper(pWrapper);
|
destorySqlParseWrapper(pWrapper);
|
||||||
qDestroyQuery(pQuery);
|
qDestroyQuery(pRequest->pQuery);
|
||||||
|
pRequest->pQuery = NULL;
|
||||||
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
||||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||||
|
@ -802,12 +801,10 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
SQuery *pQuery = NULL;
|
|
||||||
|
|
||||||
pRequest->metric.syntaxStart = taosGetTimestampUs();
|
pRequest->metric.syntaxStart = taosGetTimestampUs();
|
||||||
|
|
||||||
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)};
|
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)};
|
||||||
code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq);
|
code = qParseSqlSyntax(pCxt, &pRequest->pQuery, &catalogReq);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -817,9 +814,9 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
if (!updateMetaForce) {
|
if (!updateMetaForce) {
|
||||||
STscObj *pTscObj = pRequest->pTscObj;
|
STscObj *pTscObj = pRequest->pTscObj;
|
||||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||||
if (NULL == pQuery->pRoot) {
|
if (NULL == pRequest->pQuery->pRoot) {
|
||||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
|
||||||
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
|
} else if (QUERY_NODE_SELECT_STMT == pRequest->pQuery->pRoot->type) {
|
||||||
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
|
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -831,7 +828,6 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pWrapper->pCtx = pCxt;
|
pWrapper->pCtx = pCxt;
|
||||||
pWrapper->pQuery = pQuery;
|
|
||||||
pWrapper->pRequest = pRequest;
|
pWrapper->pRequest = pRequest;
|
||||||
pWrapper->catalogReq = catalogReq;
|
pWrapper->catalogReq = catalogReq;
|
||||||
|
|
||||||
|
|
|
@ -1278,7 +1278,9 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
||||||
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
|
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint32_t cap = dst->info.capacity;
|
||||||
dst->info = src->info;
|
dst->info = src->info;
|
||||||
|
dst->info.capacity = cap;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1302,8 +1304,9 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
||||||
|
|
||||||
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
|
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
|
||||||
}
|
}
|
||||||
|
uint32_t cap = dst->info.capacity;
|
||||||
dst->info = src->info;
|
dst->info = src->info;
|
||||||
|
dst->info.capacity = cap;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1333,6 +1336,8 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) {
|
||||||
// group id
|
// group id
|
||||||
taosArrayPush(pBlock->pDataBlock, &infoData);
|
taosArrayPush(pBlock->pDataBlock, &infoData);
|
||||||
|
|
||||||
|
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
infoData.info.bytes = sizeof(TSKEY);
|
||||||
// calculate start ts
|
// calculate start ts
|
||||||
taosArrayPush(pBlock->pDataBlock, &infoData);
|
taosArrayPush(pBlock->pDataBlock, &infoData);
|
||||||
// calculate end ts
|
// calculate end ts
|
||||||
|
@ -2280,4 +2285,3 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
|
||||||
ASSERT(pStart - pData == dataLen);
|
ASSERT(pStart - pData == dataLen);
|
||||||
return pStart;
|
return pStart;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -193,6 +193,8 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader
|
||||||
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
|
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
|
||||||
static int32_t doBuildDataBlock(STsdbReader* pReader);
|
static int32_t doBuildDataBlock(STsdbReader* pReader);
|
||||||
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
||||||
|
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
|
||||||
|
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
|
||||||
|
|
||||||
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
||||||
|
|
||||||
|
@ -893,7 +895,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
|
||||||
if (pData->cid < pColData->info.colId) {
|
if (pData->cid < pColData->info.colId) {
|
||||||
colIndex += 1;
|
colIndex += 1;
|
||||||
} else if (pData->cid == pColData->info.colId) {
|
} else if (pData->cid == pColData->info.colId) {
|
||||||
if (pData->flag == HAS_NONE || pData->flag == HAS_NULL) {
|
if (pData->flag == HAS_NONE || pData->flag == HAS_NULL || pData->flag == (HAS_NULL|HAS_NONE)) {
|
||||||
colDataAppendNNULL(pColData, 0, remain);
|
colDataAppendNNULL(pColData, 0, remain);
|
||||||
} else {
|
} else {
|
||||||
if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
|
if (IS_NUMERIC_TYPE(pColData->info.type) && asc) {
|
||||||
|
@ -1537,7 +1539,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = k.ts;
|
minKey = k.ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey > key && pBlockData->nRow > 0) {
|
if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||||
minKey = key;
|
minKey = key;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1550,7 +1552,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = k.ts;
|
minKey = k.ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey < key && pBlockData->nRow > 0) {
|
if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||||
minKey = key;
|
minKey = key;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1688,7 +1690,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
|
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
if (pBlockData->nRow > 0) {
|
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||||
// no last block available, only data block exists
|
// no last block available, only data block exists
|
||||||
if (!hasDataInLastBlock(pLastBlockReader)) {
|
if (!hasDataInLastBlock(pLastBlockReader)) {
|
||||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||||
|
@ -1753,7 +1755,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
|
int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN;
|
||||||
|
|
||||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||||
TSDBKEY ik = TSDBROW_KEY(piRow);
|
TSDBKEY ik = TSDBROW_KEY(piRow);
|
||||||
|
@ -1769,7 +1771,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = ik.ts;
|
minKey = ik.ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey > key && pBlockData->nRow > 0) {
|
if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||||
minKey = key;
|
minKey = key;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1786,7 +1788,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = ik.ts;
|
minKey = ik.ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey < key && pBlockData->nRow > 0) {
|
if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||||
minKey = key;
|
minKey = key;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2021,6 +2023,13 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
|
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
|
||||||
|
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
||||||
|
if (pBlockData->nRow > 0) {
|
||||||
|
ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
|
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
|
||||||
STsdbReader* pReader) {
|
STsdbReader* pReader) {
|
||||||
|
@ -2052,7 +2061,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
||||||
SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
|
SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
int64_t key = (pBlockData->nRow > 0) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
|
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
|
||||||
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
|
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
|
||||||
return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
|
return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
|
||||||
} else {
|
} else {
|
||||||
|
@ -3290,9 +3299,31 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
|
||||||
|
|
||||||
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
|
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
|
||||||
|
|
||||||
|
|
||||||
|
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
||||||
|
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||||
|
|
||||||
|
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
|
||||||
|
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
||||||
|
|
||||||
|
// no data in files, let's try buffer in memory
|
||||||
|
if (pReader->status.fileIter.numOfFiles == 0) {
|
||||||
|
pReader->status.loadFromFile = false;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else {
|
||||||
|
return initForFirstBlockInFile(pReader, pBlockIter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ====================================== EXPOSED APIs ======================================
|
// ====================================== EXPOSED APIs ======================================
|
||||||
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
|
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
|
||||||
const char* idstr) {
|
const char* idstr) {
|
||||||
|
STimeWindow window = pCond->twindows;
|
||||||
|
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
||||||
|
pCond->twindows.skey += 1;
|
||||||
|
pCond->twindows.ekey -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
|
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, 4096, idstr);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -3300,21 +3331,20 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
||||||
|
|
||||||
// check for query time window
|
// check for query time window
|
||||||
STsdbReader* pReader = *ppReader;
|
STsdbReader* pReader = *ppReader;
|
||||||
if (isEmptyQueryTimeWindow(&pReader->window)) {
|
if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
|
||||||
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
|
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
||||||
// update the SQueryTableDataCond to create inner reader
|
// update the SQueryTableDataCond to create inner reader
|
||||||
STimeWindow w = pCond->twindows;
|
|
||||||
int32_t order = pCond->order;
|
int32_t order = pCond->order;
|
||||||
if (order == TSDB_ORDER_ASC) {
|
if (order == TSDB_ORDER_ASC) {
|
||||||
pCond->twindows.ekey = pCond->twindows.skey;
|
pCond->twindows.ekey = window.skey;
|
||||||
pCond->twindows.skey = INT64_MIN;
|
pCond->twindows.skey = INT64_MIN;
|
||||||
pCond->order = TSDB_ORDER_DESC;
|
pCond->order = TSDB_ORDER_DESC;
|
||||||
} else {
|
} else {
|
||||||
pCond->twindows.skey = pCond->twindows.ekey;
|
pCond->twindows.skey = window.ekey;
|
||||||
pCond->twindows.ekey = INT64_MAX;
|
pCond->twindows.ekey = INT64_MAX;
|
||||||
pCond->order = TSDB_ORDER_ASC;
|
pCond->order = TSDB_ORDER_ASC;
|
||||||
}
|
}
|
||||||
|
@ -3326,12 +3356,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
||||||
}
|
}
|
||||||
|
|
||||||
if (order == TSDB_ORDER_ASC) {
|
if (order == TSDB_ORDER_ASC) {
|
||||||
pCond->twindows.skey = w.ekey;
|
pCond->twindows.skey = window.ekey;
|
||||||
pCond->twindows.ekey = INT64_MAX;
|
pCond->twindows.ekey = INT64_MAX;
|
||||||
} else {
|
} else {
|
||||||
pCond->twindows.skey = INT64_MIN;
|
pCond->twindows.skey = INT64_MIN;
|
||||||
pCond->twindows.ekey = w.ekey;
|
pCond->twindows.ekey = window.ekey;
|
||||||
}
|
}
|
||||||
|
pCond->order = order;
|
||||||
|
|
||||||
code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
|
code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[1], 1, idstr);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -3340,20 +3372,22 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
||||||
|
|
||||||
// NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
|
// NOTE: the endVersion in pCond is the data version not schema version, so pCond->endVersion is not correct here.
|
||||||
if (pCond->suid != 0) {
|
if (pCond->suid != 0) {
|
||||||
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, /*pCond->endVersion*/ -1);
|
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
|
||||||
if (pReader->pSchema == NULL) {
|
if (pReader->pSchema == NULL) {
|
||||||
tsdbError("failed to get table schema, suid:%"PRIu64", ver:%"PRId64" , %s", pReader->suid, -1, pReader->idStr);
|
tsdbError("failed to get table schema, suid:%"PRIu64", ver:%"PRId64" , %s", pReader->suid, -1, pReader->idStr);
|
||||||
}
|
}
|
||||||
} else if (taosArrayGetSize(pTableList) > 0) {
|
} else if (taosArrayGetSize(pTableList) > 0) {
|
||||||
STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
|
STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
|
||||||
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, /*pCond->endVersion*/ -1);
|
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1);
|
||||||
if (pReader->pSchema == NULL) {
|
if (pReader->pSchema == NULL) {
|
||||||
tsdbError("failed to get table schema, uid:%"PRIu64", ver:%"PRId64" , %s", pKey->uid, -1, pReader->idStr);
|
tsdbError("failed to get table schema, uid:%"PRIu64", ver:%"PRId64" , %s", pKey->uid, -1, pReader->idStr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STsdbReader* p = pReader->innerReader[0] != NULL? pReader->innerReader[0]:pReader;
|
||||||
|
|
||||||
int32_t numOfTables = taosArrayGetSize(pTableList);
|
int32_t numOfTables = taosArrayGetSize(pTableList);
|
||||||
pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
|
pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList->pData, numOfTables);
|
||||||
if (pReader->status.pTableMap == NULL) {
|
if (pReader->status.pTableMap == NULL) {
|
||||||
tsdbReaderClose(pReader);
|
tsdbReaderClose(pReader);
|
||||||
*ppReader = NULL;
|
*ppReader = NULL;
|
||||||
|
@ -3368,40 +3402,36 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
|
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
|
||||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
code = doOpenReaderImpl(pReader);
|
||||||
|
|
||||||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
|
|
||||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
|
||||||
|
|
||||||
// no data in files, let's try buffer in memory
|
|
||||||
if (pReader->status.fileIter.numOfFiles == 0) {
|
|
||||||
pReader->status.loadFromFile = false;
|
|
||||||
} else {
|
|
||||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
STsdbReader* pPrevReader = pReader->innerReader[0];
|
STsdbReader* pPrevReader = pReader->innerReader[0];
|
||||||
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;
|
STsdbReader* pNextReader = pReader->innerReader[1];
|
||||||
|
|
||||||
code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap, pReader->idStr);
|
// we need only one row
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
pPrevReader->capacity = 1;
|
||||||
goto _err;
|
pPrevReader->status.pTableMap = pReader->status.pTableMap;
|
||||||
}
|
pPrevReader->pReadSnap = pReader->pReadSnap;
|
||||||
|
|
||||||
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader);
|
pNextReader->capacity = 1;
|
||||||
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order);
|
pNextReader->status.pTableMap = pReader->status.pTableMap;
|
||||||
|
pNextReader->pReadSnap = pReader->pReadSnap;
|
||||||
|
|
||||||
// no data in files, let's try buffer in memory
|
code = doOpenReaderImpl(pPrevReader);
|
||||||
if (pPrevReader->status.fileIter.numOfFiles == 0) {
|
|
||||||
pPrevReader->status.loadFromFile = false;
|
|
||||||
} else {
|
|
||||||
code = initForFirstBlockInFile(pPrevReader, pBlockIter);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = doOpenReaderImpl(pNextReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = doOpenReaderImpl(pReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3418,6 +3448,19 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
if (pReader->innerReader[0] != NULL) {
|
||||||
|
pReader->innerReader[0]->status.pTableMap = NULL;
|
||||||
|
pReader->innerReader[0]->pReadSnap = NULL;
|
||||||
|
|
||||||
|
pReader->innerReader[1]->status.pTableMap = NULL;
|
||||||
|
pReader->innerReader[1]->pReadSnap = NULL;
|
||||||
|
|
||||||
|
tsdbReaderClose(pReader->innerReader[0]);
|
||||||
|
tsdbReaderClose(pReader->innerReader[1]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
|
|
||||||
taosMemoryFreeClear(pSupInfo->plist);
|
taosMemoryFreeClear(pSupInfo->plist);
|
||||||
|
@ -3508,32 +3551,32 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->innerReader[0] != NULL) {
|
if (pReader->innerReader[0] != NULL && pReader->step == 0) {
|
||||||
bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
|
bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
|
||||||
if (ret) {
|
resetDataBlockScanInfo(pReader->innerReader[0]->status.pTableMap, pReader->innerReader[0]->window.ekey);
|
||||||
pReader->step = EXTERNAL_ROWS_PREV;
|
pReader->step = EXTERNAL_ROWS_PREV;
|
||||||
|
|
||||||
|
if (ret) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbReaderClose(pReader->innerReader[0]);
|
|
||||||
pReader->innerReader[0] = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pReader->step == EXTERNAL_ROWS_PREV) {
|
||||||
pReader->step = EXTERNAL_ROWS_MAIN;
|
pReader->step = EXTERNAL_ROWS_MAIN;
|
||||||
|
}
|
||||||
|
|
||||||
bool ret = doTsdbNextDataBlock(pReader);
|
bool ret = doTsdbNextDataBlock(pReader);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->innerReader[1] != NULL) {
|
if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
|
||||||
|
resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
|
||||||
bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
|
bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]);
|
||||||
if (ret1) {
|
|
||||||
pReader->step = EXTERNAL_ROWS_NEXT;
|
pReader->step = EXTERNAL_ROWS_NEXT;
|
||||||
|
if (ret1) {
|
||||||
return ret1;
|
return ret1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbReaderClose(pReader->innerReader[1]);
|
|
||||||
pReader->innerReader[1] = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -81,11 +81,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
|
||||||
|
|
||||||
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }
|
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }
|
||||||
|
|
||||||
static int32_t getExprFunctionId(SExprInfo* pExprInfo) {
|
|
||||||
assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
|
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
|
||||||
|
|
||||||
static void releaseQueryBuf(size_t numOfTables);
|
static void releaseQueryBuf(size_t numOfTables);
|
||||||
|
@ -1115,7 +1110,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
|
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status);
|
||||||
|
|
||||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) {
|
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) {
|
||||||
if (pFilterNode == NULL || pBlock->info.rows == 0) {
|
if (pFilterNode == NULL || pBlock->info.rows == 0) {
|
||||||
|
@ -1126,18 +1121,17 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
|
||||||
|
|
||||||
// todo move to the initialization function
|
// todo move to the initialization function
|
||||||
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
|
||||||
|
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
|
||||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
|
||||||
SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock};
|
|
||||||
code = filterSetDataFromSlotId(filter, ¶m1);
|
code = filterSetDataFromSlotId(filter, ¶m1);
|
||||||
|
|
||||||
int8_t* rowRes = NULL;
|
SColumnInfoData* p = NULL;
|
||||||
|
int32_t status = 0;
|
||||||
|
|
||||||
// todo the keep seems never to be True??
|
// todo the keep seems never to be True??
|
||||||
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
|
bool keep = filterExecute(filter, pBlock, &p, NULL, param1.numOfCols, &status);
|
||||||
filterFreeInfo(filter);
|
filterFreeInfo(filter);
|
||||||
|
|
||||||
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
|
extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
|
||||||
|
|
||||||
if (pColMatchInfo != NULL) {
|
if (pColMatchInfo != NULL) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) {
|
||||||
|
@ -1152,16 +1146,22 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(rowRes);
|
colDataDestroy(p);
|
||||||
|
taosMemoryFree(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
|
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
|
||||||
if (keep) {
|
if (keep) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rowRes != NULL) {
|
|
||||||
int32_t totalRows = pBlock->info.rows;
|
int32_t totalRows = pBlock->info.rows;
|
||||||
|
|
||||||
|
if (status == FILTER_RESULT_ALL_QUALIFIED) {
|
||||||
|
// here nothing needs to be done
|
||||||
|
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
|
||||||
|
pBlock->info.rows = 0;
|
||||||
|
} else {
|
||||||
SSDataBlock* px = createOneDataBlock(pBlock, true);
|
SSDataBlock* px = createOneDataBlock(pBlock, true);
|
||||||
|
|
||||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
@ -1177,7 +1177,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
||||||
|
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
for (int32_t j = 0; j < totalRows; ++j) {
|
for (int32_t j = 0; j < totalRows; ++j) {
|
||||||
if (rowRes[j] == 0) {
|
if (((int8_t*)p->pData)[j] == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1189,6 +1189,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
||||||
numOfRows += 1;
|
numOfRows += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo this value can be assigned directly
|
||||||
if (pBlock->info.rows == totalRows) {
|
if (pBlock->info.rows == totalRows) {
|
||||||
pBlock->info.rows = numOfRows;
|
pBlock->info.rows = numOfRows;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1197,13 +1198,10 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataDestroy(px); // fix memory leak
|
blockDataDestroy(px); // fix memory leak
|
||||||
} else {
|
|
||||||
// do nothing
|
|
||||||
pBlock->info.rows = 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
|
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
|
||||||
// for simple group by query without interval, all the tables belong to one group result.
|
// for simple group by query without interval, all the tables belong to one group result.
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SAggOperatorInfo* pAggInfo = pOperator->info;
|
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||||
|
@ -4246,10 +4244,10 @@ int32_t buildDataBlockFromGroupRes(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock
|
||||||
|
|
||||||
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
|
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
|
||||||
if (pCtx[j].fpSet.finalize) {
|
if (pCtx[j].fpSet.finalize) {
|
||||||
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
|
||||||
if (TAOS_FAILED(code)) {
|
if (TAOS_FAILED(code1)) {
|
||||||
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
|
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code1);
|
||||||
}
|
}
|
||||||
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
||||||
// do nothing, todo refactor
|
// do nothing, todo refactor
|
||||||
|
|
|
@ -1303,10 +1303,14 @@ void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t*
|
||||||
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
|
SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||||
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||||
colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
|
colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
|
||||||
colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
|
colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
|
||||||
colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
|
colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
|
||||||
colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false);
|
colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false);
|
||||||
|
colDataAppendNULL(pCalStartCol, pBlock->info.rows);
|
||||||
|
colDataAppendNULL(pCalEndCol, pBlock->info.rows);
|
||||||
pBlock->info.rows++;
|
pBlock->info.rows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1115,7 +1115,7 @@ static bool compareVal(const char* v, const SStateKeys* pKey) {
|
||||||
if (varDataLen(v) != varDataLen(pKey->pData)) {
|
if (varDataLen(v) != varDataLen(pKey->pData)) {
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
return strncmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
|
return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return memcmp(pKey->pData, v, pKey->bytes) == 0;
|
return memcmp(pKey->pData, v, pKey->bytes) == 0;
|
||||||
|
@ -2075,7 +2075,7 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pSliceInfo->fillLastPoint = isLastRow ? true : false;
|
pSliceInfo->fillLastPoint = isLastRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) {
|
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) {
|
||||||
|
@ -2294,15 +2294,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
|
||||||
// if (pOperator->status == OP_RES_TO_RETURN) {
|
|
||||||
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
|
||||||
// if (pResBlock->info.rows == 0 || !hasRemainResults(&pSliceInfo->groupResInfo)) {
|
|
||||||
// doSetOperatorCompleted(pOperator);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return pResBlock;
|
|
||||||
// }
|
|
||||||
|
|
||||||
int32_t order = TSDB_ORDER_ASC;
|
int32_t order = TSDB_ORDER_ASC;
|
||||||
SInterval* pInterval = &pSliceInfo->interval;
|
SInterval* pInterval = &pSliceInfo->interval;
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
@ -2432,6 +2423,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// store ts value as start, and calculate interp value when processing next block
|
||||||
|
doKeepLinearInfo(pSliceInfo, pBlock, i, true);
|
||||||
}
|
}
|
||||||
} else { // non-linear interpolation
|
} else { // non-linear interpolation
|
||||||
if (i < pBlock->info.rows - 1) {
|
if (i < pBlock->info.rows - 1) {
|
||||||
|
@ -2510,6 +2504,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else { // it is the last row of current block
|
||||||
|
// store ts value as start, and calculate interp value when processing next block
|
||||||
|
doKeepLinearInfo(pSliceInfo, pBlock, i, true);
|
||||||
}
|
}
|
||||||
} else { // non-linear interpolation
|
} else { // non-linear interpolation
|
||||||
pSliceInfo->current =
|
pSliceInfo->current =
|
||||||
|
@ -2615,6 +2612,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
||||||
pInfo->interval.interval = pInterpPhyNode->interval;
|
pInfo->interval.interval = pInterpPhyNode->interval;
|
||||||
pInfo->current = pInfo->win.skey;
|
pInfo->current = pInfo->win.skey;
|
||||||
|
|
||||||
|
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
|
||||||
|
pScanInfo->cond.twindows = pInfo->win;
|
||||||
|
pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL;
|
||||||
|
|
||||||
pOperator->name = "TimeSliceOperator";
|
pOperator->name = "TimeSliceOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
|
|
|
@ -207,7 +207,6 @@ static int32_t countTrailingSpaces(const SValueNode* pVal, bool isLtrim) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return numOfSpaces;
|
return numOfSpaces;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void static addTimezoneParam(SNodeList* pList) {
|
void static addTimezoneParam(SNodeList* pList) {
|
||||||
|
@ -331,7 +330,6 @@ static int32_t translateTrimStr(SFunctionNode* pFunc, char* pErrBuf, int32_t len
|
||||||
numOfSpaces = countTrailingSpaces(pValue, isLtrim);
|
numOfSpaces = countTrailingSpaces(pValue, isLtrim);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t resBytes = pPara1->resType.bytes - numOfSpaces;
|
int32_t resBytes = pPara1->resType.bytes - numOfSpaces;
|
||||||
pFunc->node.resType = (SDataType){.bytes = resBytes, .type = pPara1->resType.type};
|
pFunc->node.resType = (SDataType){.bytes = resBytes, .type = pPara1->resType.type};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2141,7 +2139,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "avg",
|
.name = "avg",
|
||||||
.type = FUNCTION_TYPE_AVG,
|
.type = FUNCTION_TYPE_AVG,
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED,
|
||||||
.translateFunc = translateInNumOutDou,
|
.translateFunc = translateInNumOutDou,
|
||||||
.dataRequiredFunc = statisDataRequired,
|
.dataRequiredFunc = statisDataRequired,
|
||||||
.getEnvFunc = getAvgFuncEnv,
|
.getEnvFunc = getAvgFuncEnv,
|
||||||
|
|
|
@ -47,6 +47,7 @@ typedef struct SSumRes {
|
||||||
uint64_t usum;
|
uint64_t usum;
|
||||||
double dsum;
|
double dsum;
|
||||||
};
|
};
|
||||||
|
int16_t type;
|
||||||
} SSumRes;
|
} SSumRes;
|
||||||
|
|
||||||
typedef struct SAvgRes {
|
typedef struct SAvgRes {
|
||||||
|
@ -73,6 +74,7 @@ typedef struct SMinmaxResInfo {
|
||||||
|
|
||||||
STuplePos nullTuplePos;
|
STuplePos nullTuplePos;
|
||||||
bool nullTupleSaved;
|
bool nullTupleSaved;
|
||||||
|
int16_t type;
|
||||||
} SMinmaxResInfo;
|
} SMinmaxResInfo;
|
||||||
|
|
||||||
typedef struct STopBotResItem {
|
typedef struct STopBotResItem {
|
||||||
|
@ -485,8 +487,7 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||||
SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
int32_t bytes = pDBuf->bytes;
|
||||||
int32_t bytes = pDestCtx->input.pData[0]->info.bytes;
|
|
||||||
|
|
||||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||||
SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||||
|
@ -617,6 +618,7 @@ int32_t sumFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t type = pInput->pData[0]->info.type;
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
|
|
||||||
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
pSumRes->type = type;
|
||||||
|
|
||||||
if (IS_NULL_TYPE(type)) {
|
if (IS_NULL_TYPE(type)) {
|
||||||
numOfElem = 0;
|
numOfElem = 0;
|
||||||
|
@ -740,10 +742,10 @@ int32_t sumInvertFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||||
SSumRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
SSumRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
|
||||||
|
|
||||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||||
SSumRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
SSumRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||||
|
int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type;
|
||||||
|
|
||||||
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
|
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
|
||||||
pDBuf->isum += pSBuf->isum;
|
pDBuf->isum += pSBuf->isum;
|
||||||
|
@ -1072,10 +1074,10 @@ int32_t avgInvertFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||||
SAvgRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
SAvgRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
|
||||||
|
|
||||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||||
SAvgRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
SAvgRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||||
|
int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type;
|
||||||
|
|
||||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||||
pDBuf->sum.isum += pSBuf->sum.isum;
|
pDBuf->sum.isum += pSBuf->sum.isum;
|
||||||
|
@ -1181,6 +1183,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
|
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SMinmaxResInfo* pBuf = GET_ROWCELL_INTERBUF(pResInfo);
|
SMinmaxResInfo* pBuf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
pBuf->type = type;
|
||||||
|
|
||||||
if (IS_NULL_TYPE(type)) {
|
if (IS_NULL_TYPE(type)) {
|
||||||
numOfElems = 0;
|
numOfElems = 0;
|
||||||
|
@ -1729,10 +1732,10 @@ void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
|
||||||
int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t isMinFunc) {
|
int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t isMinFunc) {
|
||||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||||
SMinmaxResInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
SMinmaxResInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
|
||||||
|
|
||||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||||
SMinmaxResInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
SMinmaxResInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||||
|
int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type;
|
||||||
if (IS_FLOAT_TYPE(type)) {
|
if (IS_FLOAT_TYPE(type)) {
|
||||||
if (pSBuf->assign && ((((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign)) {
|
if (pSBuf->assign && ((((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign)) {
|
||||||
*(double*)&pDBuf->v = *(double*)&pSBuf->v;
|
*(double*)&pDBuf->v = *(double*)&pSBuf->v;
|
||||||
|
@ -2105,10 +2108,10 @@ int32_t stddevPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||||
SStddevRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
SStddevRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
|
||||||
|
|
||||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||||
SStddevRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
SStddevRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||||
|
int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type;
|
||||||
|
|
||||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||||
pDBuf->isum += pSBuf->isum;
|
pDBuf->isum += pSBuf->isum;
|
||||||
|
@ -3069,8 +3072,7 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||||
SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
int32_t bytes = pDBuf->bytes;
|
||||||
int32_t bytes = pDestCtx->input.pData[0]->info.bytes;
|
|
||||||
|
|
||||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||||
SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||||
|
@ -3746,9 +3748,9 @@ void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type,
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
|
||||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||||
STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx);
|
STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx);
|
||||||
|
int16_t type = pSBuf->type;
|
||||||
for (int32_t i = 0; i < pSResInfo->numOfRes; i++) {
|
for (int32_t i = 0; i < pSResInfo->numOfRes; i++) {
|
||||||
addResult(pDestCtx, pSBuf->pItems + i, type, true);
|
addResult(pDestCtx, pSBuf->pItems + i, type, true);
|
||||||
}
|
}
|
||||||
|
@ -3756,9 +3758,9 @@ int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
int32_t type = pDestCtx->input.pData[0]->info.type;
|
|
||||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||||
STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx);
|
STopBotRes* pSBuf = getTopBotOutputInfo(pSourceCtx);
|
||||||
|
int16_t type = pSBuf->type;
|
||||||
for (int32_t i = 0; i < pSResInfo->numOfRes; i++) {
|
for (int32_t i = 0; i < pSResInfo->numOfRes; i++) {
|
||||||
addResult(pDestCtx, pSBuf->pItems + i, type, false);
|
addResult(pDestCtx, pSBuf->pItems + i, type, false);
|
||||||
}
|
}
|
||||||
|
@ -5414,7 +5416,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
int32_t i = pInput->startRowIndex;
|
int32_t i = pInput->startRowIndex;
|
||||||
if (pCtx->start.key != INT64_MIN) {
|
if (pCtx->start.key != INT64_MIN) {
|
||||||
//ASSERT((pCtx->start.key < tsList[i] && pCtx->order == TSDB_ORDER_ASC) ||
|
// ASSERT((pCtx->start.key < tsList[i] && pCtx->order == TSDB_ORDER_ASC) ||
|
||||||
// (pCtx->start.key > tsList[i] && pCtx->order == TSDB_ORDER_DESC));
|
// (pCtx->start.key > tsList[i] && pCtx->order == TSDB_ORDER_DESC));
|
||||||
|
|
||||||
ASSERT(last->key == INT64_MIN);
|
ASSERT(last->key == INT64_MIN);
|
||||||
|
|
|
@ -26,11 +26,6 @@ typedef struct SFuncMgtService {
|
||||||
SHashObj* pFuncNameHashTable;
|
SHashObj* pFuncNameHashTable;
|
||||||
} SFuncMgtService;
|
} SFuncMgtService;
|
||||||
|
|
||||||
typedef struct SUdfInfo {
|
|
||||||
SDataType outputDt;
|
|
||||||
int8_t funcType;
|
|
||||||
} SUdfInfo;
|
|
||||||
|
|
||||||
static SFuncMgtService gFunMgtService;
|
static SFuncMgtService gFunMgtService;
|
||||||
static TdThreadOnce functionHashTableInit = PTHREAD_ONCE_INIT;
|
static TdThreadOnce functionHashTableInit = PTHREAD_ONCE_INIT;
|
||||||
static int32_t initFunctionCode = 0;
|
static int32_t initFunctionCode = 0;
|
||||||
|
|
|
@ -559,6 +559,8 @@ static const char* jkScanLogicPlanStableId = "StableId";
|
||||||
static const char* jkScanLogicPlanScanType = "ScanType";
|
static const char* jkScanLogicPlanScanType = "ScanType";
|
||||||
static const char* jkScanLogicPlanScanCount = "ScanCount";
|
static const char* jkScanLogicPlanScanCount = "ScanCount";
|
||||||
static const char* jkScanLogicPlanReverseScanCount = "ReverseScanCount";
|
static const char* jkScanLogicPlanReverseScanCount = "ReverseScanCount";
|
||||||
|
static const char* jkScanLogicPlanDynamicScanFuncs = "DynamicScanFuncs";
|
||||||
|
static const char* jkScanLogicPlanDataRequired = "DataRequired";
|
||||||
static const char* jkScanLogicPlanTagCond = "TagCond";
|
static const char* jkScanLogicPlanTagCond = "TagCond";
|
||||||
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
||||||
|
|
||||||
|
@ -590,6 +592,12 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanReverseScanCount, pNode->scanSeq[1]);
|
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanReverseScanCount, pNode->scanSeq[1]);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddObject(pJson, jkScanLogicPlanDynamicScanFuncs, nodeToJson, pNode->pDynamicScanFuncs);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanDataRequired, pNode->dataRequired);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond);
|
code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond);
|
||||||
}
|
}
|
||||||
|
@ -629,6 +637,12 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetUTinyIntValue(pJson, jkScanLogicPlanReverseScanCount, &pNode->scanSeq[1]);
|
code = tjsonGetUTinyIntValue(pJson, jkScanLogicPlanReverseScanCount, &pNode->scanSeq[1]);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = jsonToNodeList(pJson, jkScanLogicPlanDynamicScanFuncs, &pNode->pDynamicScanFuncs);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetIntValue(pJson, jkScanLogicPlanDataRequired, &pNode->dataRequired);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond);
|
code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1537,6 +1537,9 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
autoCreateTbl = true;
|
autoCreateTbl = true;
|
||||||
} else if (!existedUsing) {
|
} else if (!existedUsing) {
|
||||||
CHECK_CODE(getTableMeta(pCxt, tbNum, &name, dbFName));
|
CHECK_CODE(getTableMeta(pCxt, tbNum, &name, dbFName));
|
||||||
|
if (TSDB_SUPER_TABLE == pCxt->pTableMeta->tableType) {
|
||||||
|
return buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STableDataBlocks* dataBuf = NULL;
|
STableDataBlocks* dataBuf = NULL;
|
||||||
|
@ -2534,7 +2537,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
|
||||||
if (p) kv = *p;
|
if (p) kv = *p;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (kv){
|
if (kv) {
|
||||||
int32_t colLen = kv->length;
|
int32_t colLen = kv->length;
|
||||||
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
// uError("SML:data before:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision);
|
// uError("SML:data before:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision);
|
||||||
|
@ -2547,7 +2550,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
|
||||||
} else {
|
} else {
|
||||||
MemRowAppend(&pBuf, &(kv->value), colLen, ¶m);
|
MemRowAppend(&pBuf, &(kv->value), colLen, ¶m);
|
||||||
}
|
}
|
||||||
}else{
|
} else {
|
||||||
pBuilder->hasNone = true;
|
pBuilder->hasNone = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -617,7 +617,7 @@ static bool pushDownCondOptIsPriKey(SNode* pNode, SNodeList* pTableCols) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
|
if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId || TSDB_SYSTEM_TABLE == pCol->tableType) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return pushDownCondOptBelongThisTable(pNode, pTableCols);
|
return pushDownCondOptBelongThisTable(pNode, pTableCols);
|
||||||
|
|
|
@ -40,6 +40,8 @@ TEST_F(PlanGroupByTest, basic) {
|
||||||
run("SELECT COUNT(*) FROM st1 GROUP BY c1");
|
run("SELECT COUNT(*) FROM st1 GROUP BY c1");
|
||||||
|
|
||||||
run("SELECT SUM(c1) FROM st1 GROUP BY c2 HAVING SUM(c1) IS NOT NULL");
|
run("SELECT SUM(c1) FROM st1 GROUP BY c2 HAVING SUM(c1) IS NOT NULL");
|
||||||
|
|
||||||
|
run("SELECT AVG(c1) FROM st1");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(PlanGroupByTest, withPartitionBy) {
|
TEST_F(PlanGroupByTest, withPartitionBy) {
|
||||||
|
|
|
@ -99,7 +99,7 @@ typedef struct SFilterRange {
|
||||||
|
|
||||||
typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t);
|
typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t);
|
||||||
typedef int32_t(*filter_desc_compare_func)(const void *, const void *);
|
typedef int32_t(*filter_desc_compare_func)(const void *, const void *);
|
||||||
typedef bool(*filter_exec_func)(void *, int32_t, int8_t**, SColumnDataAgg *, int16_t);
|
typedef bool(*filter_exec_func)(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols);
|
||||||
typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char*, void **);
|
typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char*, void **);
|
||||||
|
|
||||||
typedef struct SFilterRangeCompare {
|
typedef struct SFilterRangeCompare {
|
||||||
|
|
|
@ -2976,14 +2976,12 @@ _return:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||||
bool all = true;
|
bool all = true;
|
||||||
uint32_t *unitIdx = NULL;
|
uint32_t *unitIdx = NULL;
|
||||||
|
|
||||||
if (*p == NULL) {
|
int8_t* p = (int8_t*)pRes->pData;
|
||||||
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
//FILTER_UNIT_CLR_F(info);
|
//FILTER_UNIT_CLR_F(info);
|
||||||
|
@ -3002,35 +3000,35 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p,
|
||||||
uint8_t optr = cunit->optr;
|
uint8_t optr = cunit->optr;
|
||||||
|
|
||||||
if (colDataIsNull((SColumnInfoData *)(cunit->colData), 0, i, NULL)) {
|
if (colDataIsNull((SColumnInfoData *)(cunit->colData), 0, i, NULL)) {
|
||||||
(*p)[i] = optr == OP_TYPE_IS_NULL ? true : false;
|
p[i] = (optr == OP_TYPE_IS_NULL) ? true : false;
|
||||||
} else {
|
} else {
|
||||||
if (optr == OP_TYPE_IS_NOT_NULL) {
|
if (optr == OP_TYPE_IS_NOT_NULL) {
|
||||||
(*p)[i] = 1;
|
p[i] = 1;
|
||||||
} else if (optr == OP_TYPE_IS_NULL) {
|
} else if (optr == OP_TYPE_IS_NULL) {
|
||||||
(*p)[i] = 0;
|
p[i] = 0;
|
||||||
} else if (cunit->rfunc >= 0) {
|
} else if (cunit->rfunc >= 0) {
|
||||||
(*p)[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]);
|
p[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]);
|
||||||
} else {
|
} else {
|
||||||
(*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData);
|
p[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData);
|
||||||
}
|
}
|
||||||
|
|
||||||
//FILTER_UNIT_SET_R(info, uidx, p[i]);
|
//FILTER_UNIT_SET_R(info, uidx, p[i]);
|
||||||
//FILTER_UNIT_SET_F(info, uidx);
|
//FILTER_UNIT_SET_F(info, uidx);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*p)[i] == 0) {
|
if (p[i] == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*p)[i]) {
|
if (p[i]) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
unitIdx += unitNum;
|
unitIdx += unitNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*p)[i] == 0) {
|
if (p[i] == 0) {
|
||||||
all = false;
|
all = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3040,7 +3038,7 @@ bool filterExecuteBasedOnStatisImpl(void *pinfo, int32_t numOfRows, int8_t** p,
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols, bool* all) {
|
int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols, bool* all) {
|
||||||
if (statis && numOfRows >= FILTER_RM_UNIT_MIN_ROWS) {
|
if (statis && numOfRows >= FILTER_RM_UNIT_MIN_ROWS) {
|
||||||
info->blkFlag = 0;
|
info->blkFlag = 0;
|
||||||
|
|
||||||
|
@ -3058,7 +3056,6 @@ int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, int8_t*
|
||||||
assert(info->unitNum > 1);
|
assert(info->unitNum > 1);
|
||||||
|
|
||||||
*all = filterExecuteBasedOnStatisImpl(info, numOfRows, p, statis, numOfCols);
|
*all = filterExecuteBasedOnStatisImpl(info, numOfRows, p, statis, numOfCols);
|
||||||
|
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3067,59 +3064,55 @@ int32_t filterExecuteBasedOnStatis(SFilterInfo *info, int32_t numOfRows, int8_t*
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
info->blkFlag = 0;
|
info->blkFlag = 0;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||||
static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
|
||||||
|
static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
|
||||||
|
static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||||
bool all = true;
|
bool all = true;
|
||||||
|
|
||||||
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) {
|
int8_t* p = (int8_t*)pRes->pData;
|
||||||
return all;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (*p == NULL) {
|
if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
|
||||||
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
|
return all;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
uint32_t uidx = info->groups[0].unitIdxs[0];
|
uint32_t uidx = info->groups[0].unitIdxs[0];
|
||||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
||||||
(*p)[i] = ((colData == NULL) || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
|
p[i] = ((colData == NULL) || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
|
||||||
|
|
||||||
if ((*p)[i] == 0) {
|
if (p[i] == 0) {
|
||||||
all = false;
|
all = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return all;
|
return all;
|
||||||
}
|
}
|
||||||
static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||||
bool all = true;
|
bool all = true;
|
||||||
|
|
||||||
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) {
|
if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
|
||||||
return all;
|
return all;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (*p == NULL) {
|
int8_t* p = (int8_t*)pRes->pData;
|
||||||
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
uint32_t uidx = info->groups[0].unitIdxs[0];
|
uint32_t uidx = info->groups[0].unitIdxs[0];
|
||||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
||||||
|
|
||||||
(*p)[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
|
p[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
|
||||||
if ((*p)[i] == 0) {
|
if (p[i] == 0) {
|
||||||
all = false;
|
all = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3127,7 +3120,7 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
|
||||||
return all;
|
return all;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||||
bool all = true;
|
bool all = true;
|
||||||
uint16_t dataSize = info->cunits[0].dataSize;
|
uint16_t dataSize = info->cunits[0].dataSize;
|
||||||
|
@ -3136,13 +3129,11 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
|
||||||
void *valData2 = info->cunits[0].valData2;
|
void *valData2 = info->cunits[0].valData2;
|
||||||
__compar_fn_t func = gDataCompare[info->cunits[0].func];
|
__compar_fn_t func = gDataCompare[info->cunits[0].func];
|
||||||
|
|
||||||
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) {
|
if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
|
||||||
return all;
|
return all;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (*p == NULL) {
|
int8_t* p = (int8_t*) pRes->pData;
|
||||||
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[0].colData, i);
|
void *colData = colDataGetData((SColumnInfoData *)info->cunits[0].colData, i);
|
||||||
|
@ -3152,9 +3143,9 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
(*p)[i] = (*rfunc)(colData, colData, valData, valData2, func);
|
p[i] = (*rfunc)(colData, colData, valData, valData2, func);
|
||||||
|
|
||||||
if ((*p)[i] == 0) {
|
if (p[i] == 0) {
|
||||||
all = false;
|
all = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3162,23 +3153,21 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
|
||||||
return all;
|
return all;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||||
bool all = true;
|
bool all = true;
|
||||||
|
|
||||||
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) {
|
if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
|
||||||
return all;
|
return all;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (*p == NULL) {
|
int8_t* p = (int8_t*) pRes->pData;
|
||||||
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
uint32_t uidx = info->groups[0].unitIdxs[0];
|
uint32_t uidx = info->groups[0].unitIdxs[0];
|
||||||
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
|
||||||
if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
|
if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
|
||||||
(*p)[i] = 0;
|
p[i] = 0;
|
||||||
all = false;
|
all = false;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -3191,14 +3180,14 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
|
||||||
qError("castConvert1 taosUcs4ToMbs error");
|
qError("castConvert1 taosUcs4ToMbs error");
|
||||||
}else{
|
}else{
|
||||||
varDataSetLen(newColData, len);
|
varDataSetLen(newColData, len);
|
||||||
(*p)[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, newColData, info->cunits[uidx].valData);
|
p[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, newColData, info->cunits[uidx].valData);
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(newColData);
|
taosMemoryFreeClear(newColData);
|
||||||
}else{
|
}else{
|
||||||
(*p)[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, colData, info->cunits[uidx].valData);
|
p[i] = filterDoCompare(gDataCompare[info->cunits[uidx].func], info->cunits[uidx].optr, colData, info->cunits[uidx].valData);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*p)[i] == 0) {
|
if (p[i] == 0) {
|
||||||
all = false;
|
all = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3207,17 +3196,15 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||||
SFilterInfo *info = (SFilterInfo *)pinfo;
|
SFilterInfo *info = (SFilterInfo *)pinfo;
|
||||||
bool all = true;
|
bool all = true;
|
||||||
|
|
||||||
if (filterExecuteBasedOnStatis(info, numOfRows, p, statis, numOfCols, &all) == 0) {
|
if (filterExecuteBasedOnStatis(info, numOfRows, pRes, statis, numOfCols, &all) == 0) {
|
||||||
return all;
|
return all;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (*p == NULL) {
|
int8_t* p = (int8_t*) pRes->pData;
|
||||||
*p = taosMemoryCalloc(numOfRows, sizeof(int8_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
//FILTER_UNIT_CLR_F(info);
|
//FILTER_UNIT_CLR_F(info);
|
||||||
|
@ -3235,14 +3222,14 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
|
||||||
uint8_t optr = cunit->optr;
|
uint8_t optr = cunit->optr;
|
||||||
|
|
||||||
if (colData == NULL || colDataIsNull((SColumnInfoData *)(cunit->colData), 0, i, NULL)) {
|
if (colData == NULL || colDataIsNull((SColumnInfoData *)(cunit->colData), 0, i, NULL)) {
|
||||||
(*p)[i] = optr == OP_TYPE_IS_NULL ? true : false;
|
p[i] = optr == OP_TYPE_IS_NULL ? true : false;
|
||||||
} else {
|
} else {
|
||||||
if (optr == OP_TYPE_IS_NOT_NULL) {
|
if (optr == OP_TYPE_IS_NOT_NULL) {
|
||||||
(*p)[i] = 1;
|
p[i] = 1;
|
||||||
} else if (optr == OP_TYPE_IS_NULL) {
|
} else if (optr == OP_TYPE_IS_NULL) {
|
||||||
(*p)[i] = 0;
|
p[i] = 0;
|
||||||
} else if (cunit->rfunc >= 0) {
|
} else if (cunit->rfunc >= 0) {
|
||||||
(*p)[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]);
|
p[i] = (*gRangeCompare[cunit->rfunc])(colData, colData, cunit->valData, cunit->valData2, gDataCompare[cunit->func]);
|
||||||
} else {
|
} else {
|
||||||
if(cunit->dataType == TSDB_DATA_TYPE_NCHAR && (cunit->optr == OP_TYPE_MATCH || cunit->optr == OP_TYPE_NMATCH)){
|
if(cunit->dataType == TSDB_DATA_TYPE_NCHAR && (cunit->optr == OP_TYPE_MATCH || cunit->optr == OP_TYPE_NMATCH)){
|
||||||
char *newColData = taosMemoryCalloc(cunit->dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1);
|
char *newColData = taosMemoryCalloc(cunit->dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1);
|
||||||
|
@ -3251,11 +3238,11 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
|
||||||
qError("castConvert1 taosUcs4ToMbs error");
|
qError("castConvert1 taosUcs4ToMbs error");
|
||||||
}else{
|
}else{
|
||||||
varDataSetLen(newColData, len);
|
varDataSetLen(newColData, len);
|
||||||
(*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, newColData, cunit->valData);
|
p[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, newColData, cunit->valData);
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(newColData);
|
taosMemoryFreeClear(newColData);
|
||||||
}else{
|
}else{
|
||||||
(*p)[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData);
|
p[i] = filterDoCompare(gDataCompare[cunit->func], cunit->optr, colData, cunit->valData);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3263,17 +3250,17 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDataAg
|
||||||
//FILTER_UNIT_SET_F(info, uidx);
|
//FILTER_UNIT_SET_F(info, uidx);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*p)[i] == 0) {
|
if (p[i] == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*p)[i]) {
|
if (p[i]) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*p)[i] == 0) {
|
if (p[i] == 0) {
|
||||||
all = false;
|
all = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4026,37 +4013,62 @@ _return:
|
||||||
FLT_RET(code);
|
FLT_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData** p, SColumnDataAgg *statis, int16_t numOfCols, int32_t *pResultStatus) {
|
||||||
if (NULL == info) {
|
if (NULL == info) {
|
||||||
|
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SScalarParam output = {0};
|
||||||
|
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
||||||
|
|
||||||
|
int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (info->scalarMode) {
|
if (info->scalarMode) {
|
||||||
SScalarParam output = {0};
|
|
||||||
|
|
||||||
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
|
||||||
int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray *pList = taosArrayInit(1, POINTER_BYTES);
|
SArray *pList = taosArrayInit(1, POINTER_BYTES);
|
||||||
taosArrayPush(pList, &pSrc);
|
taosArrayPush(pList, &pSrc);
|
||||||
|
|
||||||
FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output));
|
FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output));
|
||||||
*p = taosMemoryMalloc(output.numOfRows * sizeof(bool));
|
*p = output.columnData;
|
||||||
|
|
||||||
memcpy(*p, output.columnData->pData, output.numOfRows);
|
|
||||||
colDataDestroy(output.columnData);
|
|
||||||
taosMemoryFree(output.columnData);
|
|
||||||
|
|
||||||
taosArrayDestroy(pList);
|
taosArrayDestroy(pList);
|
||||||
|
|
||||||
|
if (output.numOfQualified == output.numOfRows) {
|
||||||
|
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
|
||||||
|
} else if (output.numOfQualified == 0) {
|
||||||
|
*pResultStatus = FILTER_RESULT_NONE_QUALIFIED;
|
||||||
|
} else {
|
||||||
|
*pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED;
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
|
} else {
|
||||||
|
*p = output.columnData;
|
||||||
|
output.numOfRows = pSrc->info.rows;
|
||||||
|
|
||||||
|
bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols);
|
||||||
|
|
||||||
|
// todo this should be return during filter procedure
|
||||||
|
int32_t num = 0;
|
||||||
|
for(int32_t i = 0; i < output.numOfRows; ++i) {
|
||||||
|
if (((int8_t*)((*p)->pData))[i] == 1) {
|
||||||
|
++num;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return (*info->func)(info, pSrc->info.rows, p, statis, numOfCols);
|
if (num == output.numOfRows) {
|
||||||
}
|
*pResultStatus = FILTER_RESULT_ALL_QUALIFIED;
|
||||||
|
} else if (num == 0) {
|
||||||
|
*pResultStatus = FILTER_RESULT_NONE_QUALIFIED;
|
||||||
|
} else {
|
||||||
|
*pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED;
|
||||||
|
}
|
||||||
|
|
||||||
|
return keep;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct SClassifyConditionCxt {
|
typedef struct SClassifyConditionCxt {
|
||||||
bool hasPrimaryKey;
|
bool hasPrimaryKey;
|
||||||
|
|
|
@ -606,6 +606,8 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
|
||||||
SCL_ERR_JRET(code);
|
SCL_ERR_JRET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t numOfQualified = 0;
|
||||||
|
|
||||||
bool value = false;
|
bool value = false;
|
||||||
bool complete = true;
|
bool complete = true;
|
||||||
for (int32_t i = 0; i < rowNum; ++i) {
|
for (int32_t i = 0; i < rowNum; ++i) {
|
||||||
|
@ -631,6 +633,9 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
|
||||||
|
|
||||||
if (complete) {
|
if (complete) {
|
||||||
colDataAppend(output->columnData, i, (char*) &value, false);
|
colDataAppend(output->columnData, i, (char*) &value, false);
|
||||||
|
if (value) {
|
||||||
|
numOfQualified++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -639,8 +644,9 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
|
||||||
output->numOfRows = 0;
|
output->numOfRows = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
output->numOfQualified = numOfQualified;
|
||||||
|
|
||||||
|
_return:
|
||||||
sclFreeParamList(params, paramNum);
|
sclFreeParamList(params, paramNum);
|
||||||
SCL_RET(code);
|
SCL_RET(code);
|
||||||
}
|
}
|
||||||
|
@ -1242,6 +1248,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
|
||||||
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows);
|
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows);
|
||||||
colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL);
|
colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL);
|
||||||
pDst->numOfRows = res->numOfRows;
|
pDst->numOfRows = res->numOfRows;
|
||||||
|
pDst->numOfQualified = res->numOfQualified;
|
||||||
}
|
}
|
||||||
|
|
||||||
sclFreeParam(res);
|
sclFreeParam(res);
|
||||||
|
@ -1249,7 +1256,6 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
//nodesDestroyNode(pNode);
|
|
||||||
sclFreeRes(ctx.pRes);
|
sclFreeRes(ctx.pRes);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1475,19 +1475,19 @@ void vectorMathMinus(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pO
|
||||||
|
|
||||||
void vectorAssign(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
|
void vectorAssign(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
|
||||||
SColumnInfoData *pOutputCol = pOut->columnData;
|
SColumnInfoData *pOutputCol = pOut->columnData;
|
||||||
|
|
||||||
pOut->numOfRows = pLeft->numOfRows;
|
pOut->numOfRows = pLeft->numOfRows;
|
||||||
|
|
||||||
// if (IS_HELPER_NULL(pRight->columnData, 0)) {
|
|
||||||
if(colDataIsNull_s(pRight->columnData, 0)){
|
if(colDataIsNull_s(pRight->columnData, 0)){
|
||||||
for (int32_t i = 0; i < pOut->numOfRows; ++i) {
|
colDataAppendNNULL(pOutputCol, 0, pOut->numOfRows);
|
||||||
colDataAppend(pOutputCol, i, NULL, true);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
|
char* d = colDataGetData(pRight->columnData, 0);
|
||||||
for (int32_t i = 0; i < pOut->numOfRows; ++i) {
|
for (int32_t i = 0; i < pOut->numOfRows; ++i) {
|
||||||
colDataAppend(pOutputCol, i, colDataGetData(pRight->columnData, 0), false);
|
colDataAppend(pOutputCol, i, d, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(pRight->numOfQualified == 1 || pRight->numOfQualified == 0);
|
||||||
|
pOut->numOfQualified = pRight->numOfQualified * pOut->numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
void vectorConcat(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
|
void vectorConcat(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
|
||||||
|
@ -1646,38 +1646,60 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut,
|
||||||
doReleaseVec(pRightCol, rightConvert);
|
doReleaseVec(pRightCol, rightConvert);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define VEC_COM_INNER(pCol, index1, index2) \
|
int32_t doVectorCompareImpl(int32_t numOfRows, SScalarParam *pOut, int32_t startIndex, int32_t step, __compar_fn_t fp,
|
||||||
for (; i < pCol->numOfRows && i >= 0; i += step) {\
|
SScalarParam *pLeft, SScalarParam *pRight, int32_t optr) {
|
||||||
if (IS_HELPER_NULL(pLeft->columnData, index1) || IS_HELPER_NULL(pRight->columnData, index2)) {\
|
int32_t num = 0;
|
||||||
bool res = false;\
|
|
||||||
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);\
|
for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) {
|
||||||
continue;\
|
int32_t leftIndex = (i >= pLeft->numOfRows)? 0:i;
|
||||||
}\
|
int32_t rightIndex = (i >= pRight->numOfRows)? 0:i;
|
||||||
char *pLeftData = colDataGetData(pLeft->columnData, index1);\
|
|
||||||
char *pRightData = colDataGetData(pRight->columnData, index2);\
|
if (IS_HELPER_NULL(pLeft->columnData, leftIndex) || IS_HELPER_NULL(pRight->columnData, rightIndex)) {
|
||||||
int64_t leftOut = 0;\
|
bool res = false;
|
||||||
int64_t rightOut = 0;\
|
colDataAppendInt8(pOut->columnData, i, (int8_t *)&res);
|
||||||
bool freeLeft = false;\
|
continue;
|
||||||
bool freeRight = false;\
|
|
||||||
bool isJsonnull = false;\
|
|
||||||
bool result = convertJsonValue(&fp, optr, GET_PARAM_TYPE(pLeft), GET_PARAM_TYPE(pRight),\
|
|
||||||
&pLeftData, &pRightData, &leftOut, &rightOut, &isJsonnull, &freeLeft, &freeRight);\
|
|
||||||
if(isJsonnull){\
|
|
||||||
ASSERT(0);\
|
|
||||||
}\
|
|
||||||
if(!pLeftData || !pRightData){\
|
|
||||||
result = false;\
|
|
||||||
}\
|
|
||||||
if(!result){\
|
|
||||||
colDataAppendInt8(pOut->columnData, i, (int8_t*)&result);\
|
|
||||||
}else{\
|
|
||||||
bool res = filterDoCompare(fp, optr, pLeftData, pRightData);\
|
|
||||||
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);\
|
|
||||||
}\
|
|
||||||
if(freeLeft) taosMemoryFreeClear(pLeftData);\
|
|
||||||
if(freeRight) taosMemoryFreeClear(pRightData);\
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char * pLeftData = colDataGetData(pLeft->columnData, leftIndex);
|
||||||
|
char * pRightData = colDataGetData(pRight->columnData, rightIndex);
|
||||||
|
int64_t leftOut = 0;
|
||||||
|
int64_t rightOut = 0;
|
||||||
|
bool freeLeft = false;
|
||||||
|
bool freeRight = false;
|
||||||
|
bool isJsonnull = false;
|
||||||
|
|
||||||
|
bool result = convertJsonValue(&fp, optr, GET_PARAM_TYPE(pLeft), GET_PARAM_TYPE(pRight), &pLeftData, &pRightData,
|
||||||
|
&leftOut, &rightOut, &isJsonnull, &freeLeft, &freeRight);
|
||||||
|
if (isJsonnull) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pLeftData || !pRightData) {
|
||||||
|
result = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!result) {
|
||||||
|
colDataAppendInt8(pOut->columnData, i, (int8_t *)&result);
|
||||||
|
} else {
|
||||||
|
bool res = filterDoCompare(fp, optr, pLeftData, pRightData);
|
||||||
|
colDataAppendInt8(pOut->columnData, i, (int8_t *)&res);
|
||||||
|
if (res) {
|
||||||
|
++num;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (freeLeft) {
|
||||||
|
taosMemoryFreeClear(pLeftData);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (freeRight) {
|
||||||
|
taosMemoryFreeClear(pRightData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return num;
|
||||||
|
}
|
||||||
|
|
||||||
void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) {
|
void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) {
|
||||||
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1;
|
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1;
|
||||||
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
|
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
|
||||||
|
@ -1704,16 +1726,12 @@ void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *
|
||||||
char *pLeftData = colDataGetData(pLeft->columnData, i);
|
char *pLeftData = colDataGetData(pLeft->columnData, i);
|
||||||
bool res = filterDoCompare(fp, optr, pLeftData, pRight->pHashFilter);
|
bool res = filterDoCompare(fp, optr, pLeftData, pRight->pHashFilter);
|
||||||
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);
|
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);
|
||||||
|
if (res) {
|
||||||
|
pOut->numOfQualified++;
|
||||||
}
|
}
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
} else { // normal compare
|
||||||
if (pLeft->numOfRows == pRight->numOfRows) {
|
pOut->numOfQualified = doVectorCompareImpl(pOut->numOfRows, pOut, i, step, fp, pLeft, pRight, optr);
|
||||||
VEC_COM_INNER(pLeft, i, i)
|
|
||||||
} else if (pRight->numOfRows == 1) {
|
|
||||||
VEC_COM_INNER(pLeft, i, 0)
|
|
||||||
} else if (pLeft->numOfRows == 1) {
|
|
||||||
VEC_COM_INNER(pRight, 0, i)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -595,11 +595,11 @@ class TDTestCase:
|
||||||
tdSql.checkData(2, i, 15)
|
tdSql.checkData(2, i, 15)
|
||||||
|
|
||||||
tdSql.query(f"select interp(c0),interp(c1),interp(c2),interp(c3) from {dbname}.{tbname} range('2020-02-09 00:00:05', '2020-02-13 00:00:05') every(1d) fill(linear)")
|
tdSql.query(f"select interp(c0),interp(c1),interp(c2),interp(c3) from {dbname}.{tbname} range('2020-02-09 00:00:05', '2020-02-13 00:00:05') every(1d) fill(linear)")
|
||||||
tdSql.checkRows(1)
|
tdSql.checkRows(3)
|
||||||
tdSql.checkCols(4)
|
tdSql.checkCols(4)
|
||||||
|
|
||||||
for i in range (tdSql.queryCols):
|
for i in range (tdSql.queryCols):
|
||||||
tdSql.checkData(0, i, 15)
|
tdSql.checkData(0, i, 13)
|
||||||
|
|
||||||
tdLog.printNoPrefix("==========step10:test error cases")
|
tdLog.printNoPrefix("==========step10:test error cases")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue