refactor: do some internal refactor and opt query performance.
This commit is contained in:
parent
751129c5c1
commit
78bbfcb4bc
|
@ -1404,7 +1404,7 @@ typedef struct STableScanAnalyzeInfo {
|
||||||
uint32_t skipBlocks;
|
uint32_t skipBlocks;
|
||||||
uint32_t filterOutBlocks;
|
uint32_t filterOutBlocks;
|
||||||
double elapsedTime;
|
double elapsedTime;
|
||||||
uint64_t filterTime;
|
double filterTime;
|
||||||
} STableScanAnalyzeInfo;
|
} STableScanAnalyzeInfo;
|
||||||
|
|
||||||
int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
|
int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
|
||||||
|
|
|
@ -123,7 +123,7 @@ void createNewTable(TAOS* pConn, int32_t index) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
for(int32_t i = 0; i < 1000; i += 20) {
|
for(int32_t i = 0; i < 100000; i += 20) {
|
||||||
char sql[1024] = {0};
|
char sql[1024] = {0};
|
||||||
sprintf(sql,
|
sprintf(sql,
|
||||||
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
||||||
|
@ -154,7 +154,7 @@ TEST(testCase, driverInit_Test) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(testCase, connect_Test) {
|
TEST(testCase, connect_Test) {
|
||||||
// taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg");
|
taos_options(TSDB_OPTION_CONFIGDIR, "/home/lisa/Documents/workspace/tdengine/sim/dnode1/cfg");
|
||||||
|
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
|
@ -501,7 +501,6 @@ TEST(testCase, show_vgroup_Test) {
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
TEST(testCase, create_multiple_tables) {
|
TEST(testCase, create_multiple_tables) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(pConn, nullptr);
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
@ -665,6 +664,7 @@ TEST(testCase, insert_test) {
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
TEST(testCase, projection_query_tables) {
|
TEST(testCase, projection_query_tables) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
@ -697,7 +697,7 @@ TEST(testCase, projection_query_tables) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
for(int32_t i = 0; i < 100; ++i) {
|
for(int32_t i = 0; i < 1; ++i) {
|
||||||
printf("create table :%d\n", i);
|
printf("create table :%d\n", i);
|
||||||
createNewTable(pConn, i);
|
createNewTable(pConn, i);
|
||||||
}
|
}
|
||||||
|
@ -723,6 +723,7 @@ TEST(testCase, projection_query_tables) {
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
TEST(testCase, projection_query_stables) {
|
TEST(testCase, projection_query_stables) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(pConn, nullptr);
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
@ -820,21 +821,8 @@ TEST(testCase, async_api_test) {
|
||||||
getchar();
|
getchar();
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
TEST(testCase, update_test) {
|
TEST(testCase, update_test) {
|
||||||
|
|
||||||
SInterval interval = {0};
|
|
||||||
interval.offset = 8000;
|
|
||||||
interval.interval = 10000;
|
|
||||||
interval.sliding = 4000;
|
|
||||||
interval.intervalUnit = 's';
|
|
||||||
interval.offsetUnit = 's';
|
|
||||||
interval.slidingUnit = 's';
|
|
||||||
// STimeWindow w = getAlignQueryTimeWindow(&interval, 0, 1630000000000);
|
|
||||||
STimeWindow w = getAlignQueryTimeWindow(&interval, 0, 1629999999999);
|
|
||||||
|
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(pConn, nullptr);
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
|
||||||
|
@ -869,4 +857,8 @@ TEST(testCase, update_test) {
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
|
@ -103,7 +103,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
|
||||||
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
|
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
|
||||||
|
|
||||||
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
||||||
bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo);
|
bool hasRemainResults(SGroupResInfo* pGroupResInfo);
|
||||||
|
|
||||||
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
|
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
|
||||||
|
|
||||||
|
|
|
@ -297,6 +297,20 @@ enum {
|
||||||
TABLE_SCAN__BLOCK_ORDER = 2,
|
TABLE_SCAN__BLOCK_ORDER = 2,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef struct SAggSupporter {
|
||||||
|
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||||
|
char* keyBuf; // window key buffer
|
||||||
|
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||||
|
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
||||||
|
} SAggSupporter;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
|
||||||
|
SInterval interval;
|
||||||
|
SAggSupporter *pAggSup;
|
||||||
|
SExprSupp *pExprSup; // expr supporter of aggregate operator
|
||||||
|
} SAggOptrPushDownInfo;
|
||||||
|
|
||||||
typedef struct STableScanInfo {
|
typedef struct STableScanInfo {
|
||||||
STsdbReader* dataReader;
|
STsdbReader* dataReader;
|
||||||
SReadHandle readHandle;
|
SReadHandle readHandle;
|
||||||
|
@ -312,12 +326,13 @@ typedef struct STableScanInfo {
|
||||||
SQueryTableDataCond cond;
|
SQueryTableDataCond cond;
|
||||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||||
int32_t dataBlockLoadFlag;
|
int32_t dataBlockLoadFlag;
|
||||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
|
// SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
|
||||||
SSampleExecInfo sample; // sample execution info
|
SSampleExecInfo sample; // sample execution info
|
||||||
int32_t currentGroupId;
|
int32_t currentGroupId;
|
||||||
int32_t currentTable;
|
int32_t currentTable;
|
||||||
int8_t scanMode;
|
int8_t scanMode;
|
||||||
int8_t noTable;
|
int8_t noTable;
|
||||||
|
SAggOptrPushDownInfo pdInfo;
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
typedef struct STableMergeScanInfo {
|
typedef struct STableMergeScanInfo {
|
||||||
|
@ -504,13 +519,6 @@ typedef struct SOptrBasicInfo {
|
||||||
SSDataBlock* pRes;
|
SSDataBlock* pRes;
|
||||||
} SOptrBasicInfo;
|
} SOptrBasicInfo;
|
||||||
|
|
||||||
typedef struct SAggSupporter {
|
|
||||||
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
|
||||||
char* keyBuf; // window key buffer
|
|
||||||
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
|
||||||
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
|
||||||
} SAggSupporter;
|
|
||||||
|
|
||||||
typedef struct SIntervalAggOperatorInfo {
|
typedef struct SIntervalAggOperatorInfo {
|
||||||
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
||||||
SOptrBasicInfo binfo; // basic info
|
SOptrBasicInfo binfo; // basic info
|
||||||
|
|
|
@ -137,7 +137,7 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
|
||||||
ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo) {
|
bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
|
||||||
if (pGroupResInfo->pRows == NULL) {
|
if (pGroupResInfo->pRows == NULL) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,8 +141,7 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock,
|
||||||
SqlFunctionCtx* pCtx, int32_t numOfExprs);
|
SqlFunctionCtx* pCtx, int32_t numOfExprs);
|
||||||
|
|
||||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||||
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
|
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
||||||
uint64_t groupId);
|
|
||||||
|
|
||||||
// setup the output buffer for each operator
|
// setup the output buffer for each operator
|
||||||
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
|
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
|
||||||
|
@ -1393,10 +1392,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
|
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
|
||||||
uint64_t groupId) {
|
|
||||||
// for simple group by query without interval, all the tables belong to one group result.
|
// for simple group by query without interval, all the tables belong to one group result.
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||||
|
|
||||||
SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
|
SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
|
||||||
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||||
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
|
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
|
||||||
|
@ -1420,14 +1420,13 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggIn
|
||||||
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
|
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId, SAggOperatorInfo* pAggInfo) {
|
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
|
||||||
if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
|
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||||
|
if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
#ifdef BUF_PAGE_DEBUG
|
|
||||||
qDebug("page_setbuf, groupId:%" PRIu64, groupId);
|
doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
|
||||||
#endif
|
|
||||||
doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
|
|
||||||
|
|
||||||
// record the current active group id
|
// record the current active group id
|
||||||
pAggInfo->groupId = groupId;
|
pAggInfo->groupId = groupId;
|
||||||
|
@ -1594,7 +1593,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
pBlock->info.version = pTaskInfo->version;
|
pBlock->info.version = pTaskInfo->version;
|
||||||
|
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
if (!hasDataInGroupInfo(pGroupResInfo)) {
|
if (!hasRemainResults(pGroupResInfo)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2931,7 +2930,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo);
|
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId);
|
||||||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
|
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
|
||||||
code = doAggregateImpl(pOperator, pSup->pCtx);
|
code = doAggregateImpl(pOperator, pSup->pCtx);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -2966,7 +2965,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
||||||
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
|
||||||
doFilter(pAggInfo->pCondition, pInfo->pRes, NULL);
|
doFilter(pAggInfo->pCondition, pInfo->pRes, NULL);
|
||||||
|
|
||||||
if (!hasDataInGroupInfo(&pAggInfo->groupResInfo)) {
|
if (!hasRemainResults(&pAggInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -3356,7 +3355,6 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
||||||
pOperator->numOfDownstream = 0;
|
pOperator->numOfDownstream = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanupExprSupp(&pOperator->exprSupp);
|
|
||||||
taosMemoryFreeClear(pOperator);
|
taosMemoryFreeClear(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3501,7 +3499,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->groupId = INT32_MIN;
|
pInfo->groupId = UINT64_MAX;
|
||||||
pInfo->pCondition = pCondition;
|
pInfo->pCondition = pCondition;
|
||||||
pOperator->name = "TableAggregate";
|
pOperator->name = "TableAggregate";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
|
||||||
|
@ -3513,6 +3511,12 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
|
||||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||||
|
|
||||||
|
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
|
STableScanInfo* pTableScanInfo = downstream->info;
|
||||||
|
pTableScanInfo->pdInfo.pExprSup = &pOperator->exprSupp;
|
||||||
|
pTableScanInfo->pdInfo.pAggSup = &pInfo->aggSup;
|
||||||
|
}
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
|
|
@ -301,8 +301,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
doFilter(pInfo->pCondition, pRes, NULL);
|
doFilter(pInfo->pCondition, pRes, NULL);
|
||||||
|
|
||||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
if (!hasRemainResults(&pInfo->groupResInfo)) {
|
||||||
if (!hasRemain) {
|
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,6 +166,67 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this function is for table scanner to extract temporary results of upstream aggregate results.
|
||||||
|
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
|
||||||
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t buf[2] = {0};
|
||||||
|
SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);
|
||||||
|
|
||||||
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
|
||||||
|
SResultRowPosition* p1 =
|
||||||
|
(SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
|
||||||
|
|
||||||
|
if (p1 == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pPage = getBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, p1->pageId);
|
||||||
|
return (SResultRow*)((char*)(*pPage) + p1->offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
|
||||||
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
|
||||||
|
if (pTableScanInfo->pdInfo.pExprSup == NULL) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SExprSupp* pSup1 = pTableScanInfo->pdInfo.pExprSup;
|
||||||
|
|
||||||
|
SFilePage* pPage = NULL;
|
||||||
|
SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->groupId, &pPage);
|
||||||
|
|
||||||
|
if (pRow == NULL) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool notLoadBlock = true;
|
||||||
|
for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
|
||||||
|
int32_t functionId = pSup1->pCtx[i].functionId;
|
||||||
|
|
||||||
|
SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->pdInfo.pExprSup->rowEntryInfoOffset);
|
||||||
|
|
||||||
|
int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
|
||||||
|
if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
|
||||||
|
notLoadBlock = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// release buffer pages
|
||||||
|
releaseBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, pPage);
|
||||||
|
|
||||||
|
if (notLoadBlock) {
|
||||||
|
*status = FUNC_DATA_REQUIRED_NOT_LOAD;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||||
uint32_t* status) {
|
uint32_t* status) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -178,7 +239,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
|
|
||||||
*status = pInfo->dataBlockLoadFlag;
|
*status = pInfo->dataBlockLoadFlag;
|
||||||
if (pTableScanInfo->pFilterNode != NULL ||
|
if (pTableScanInfo->pFilterNode != NULL ||
|
||||||
overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
|
overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
|
||||||
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
|
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,6 +293,16 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
|
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
|
||||||
|
|
||||||
// todo filter data block according to the block sma data firstly
|
// todo filter data block according to the block sma data firstly
|
||||||
|
|
||||||
|
doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
|
||||||
|
if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
|
||||||
|
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
|
pCost->skipBlocks += 1;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
|
if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
|
||||||
pCost->filterOutBlocks += 1;
|
pCost->filterOutBlocks += 1;
|
||||||
|
@ -263,18 +334,20 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
if (pTableScanInfo->pFilterNode != NULL) {
|
||||||
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
|
int64_t st = taosGetTimestampUs();
|
||||||
|
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
|
||||||
|
|
||||||
int64_t et = taosGetTimestampMs();
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
pTableScanInfo->readRecorder.filterTime += (et - st);
|
pTableScanInfo->readRecorder.filterTime += el;
|
||||||
|
|
||||||
if (pBlock->info.rows == 0) {
|
if (pBlock->info.rows == 0) {
|
||||||
pCost->filterOutBlocks += 1;
|
pCost->filterOutBlocks += 1;
|
||||||
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s data block filter out, elapsed time:%" PRId64, GET_TASKID(pTaskInfo), (et - st));
|
qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -602,11 +675,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
|
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
|
// pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
|
||||||
// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
|
pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
|
||||||
|
pInfo->cond.order = TSDB_ORDER_DESC;
|
||||||
|
|
||||||
|
pInfo->pdInfo.interval = extractIntervalInfo(pTableScanNode);
|
||||||
pInfo->readHandle = *readHandle;
|
pInfo->readHandle = *readHandle;
|
||||||
pInfo->interval = extractIntervalInfo(pTableScanNode);
|
|
||||||
pInfo->sample.sampleRatio = pTableScanNode->ratio;
|
pInfo->sample.sampleRatio = pTableScanNode->ratio;
|
||||||
pInfo->sample.seed = taosGetTimestampSec();
|
pInfo->sample.seed = taosGetTimestampSec();
|
||||||
|
|
||||||
|
@ -1484,14 +1558,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
pInfo->tqReader = pHandle->tqReader;
|
pInfo->tqReader = pHandle->tqReader;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTSInfo->interval.interval > 0) {
|
if (pTSInfo->pdInfo.interval.interval > 0) {
|
||||||
pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pInfo->twAggSup.waterMark);
|
pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->pdInfo.interval, pInfo->twAggSup.waterMark);
|
||||||
} else {
|
} else {
|
||||||
pInfo->pUpdateInfo = NULL;
|
pInfo->pUpdateInfo = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pTableScanOp = pTableScanOp;
|
pInfo->pTableScanOp = pTableScanOp;
|
||||||
pInfo->interval = pTSInfo->interval;
|
pInfo->interval = pTSInfo->pdInfo.interval;
|
||||||
|
|
||||||
pInfo->readHandle = *pHandle;
|
pInfo->readHandle = *pHandle;
|
||||||
pInfo->tableUid = pScanPhyNode->uid;
|
pInfo->tableUid = pScanPhyNode->uid;
|
||||||
|
@ -2667,16 +2741,20 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
if (pTableScanInfo->pFilterNode != NULL) {
|
||||||
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
|
int64_t st = taosGetTimestampMs();
|
||||||
|
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
|
||||||
|
|
||||||
int64_t et = taosGetTimestampMs();
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
pTableScanInfo->readRecorder.filterTime += (et - st);
|
pTableScanInfo->readRecorder.filterTime += el;
|
||||||
|
|
||||||
if (pBlock->info.rows == 0) {
|
if (pBlock->info.rows == 0) {
|
||||||
pCost->filterOutBlocks += 1;
|
pCost->filterOutBlocks += 1;
|
||||||
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
|
||||||
|
} else {
|
||||||
|
qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -1216,7 +1216,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
|
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
|
||||||
|
|
||||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
|
||||||
if (!hasRemain) {
|
if (!hasRemain) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
break;
|
break;
|
||||||
|
@ -1256,7 +1256,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
|
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
|
||||||
|
|
||||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
|
||||||
if (!hasRemain) {
|
if (!hasRemain) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
break;
|
break;
|
||||||
|
@ -1293,7 +1293,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
doFilter(pInfo->pCondition, pBlock, NULL);
|
doFilter(pInfo->pCondition, pBlock, NULL);
|
||||||
|
|
||||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
|
||||||
if (!hasRemain) {
|
if (!hasRemain) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
break;
|
break;
|
||||||
|
@ -1562,7 +1562,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
qDebug("===stream===single interval is done");
|
qDebug("===stream===single interval is done");
|
||||||
freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
||||||
|
@ -2009,7 +2009,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
|
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
|
||||||
|
|
||||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
|
||||||
if (!hasRemain) {
|
if (!hasRemain) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
break;
|
break;
|
||||||
|
@ -2052,7 +2052,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
|
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
|
||||||
|
|
||||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
|
||||||
if (!hasRemain) {
|
if (!hasRemain) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
break;
|
break;
|
||||||
|
@ -2210,7 +2210,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// if (pOperator->status == OP_RES_TO_RETURN) {
|
// if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
||||||
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
|
// if (pResBlock->info.rows == 0 || !hasRemainResults(&pSliceInfo->groupResInfo)) {
|
||||||
// doSetOperatorCompleted(pOperator);
|
// doSetOperatorCompleted(pOperator);
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
|
@ -3823,7 +3823,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
||||||
|
@ -4379,7 +4379,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
printDataBlock(pBInfo->pRes, "single state");
|
printDataBlock(pBInfo->pRes, "single state");
|
||||||
|
|
|
@ -118,6 +118,7 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
int32_t getFirstLastInfoSize(int32_t resBytes);
|
int32_t getFirstLastInfoSize(int32_t resBytes);
|
||||||
|
EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow);
|
||||||
|
|
||||||
int32_t lastRowFunction(SqlFunctionCtx *pCtx);
|
int32_t lastRowFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
|
|
|
@ -2312,6 +2312,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.type = FUNCTION_TYPE_LAST,
|
.type = FUNCTION_TYPE_LAST,
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||||
.translateFunc = translateFirstLast,
|
.translateFunc = translateFirstLast,
|
||||||
|
.dynDataRequiredFunc = lastDynDataReq,
|
||||||
.getEnvFunc = getFirstLastFuncEnv,
|
.getEnvFunc = getFirstLastFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = lastFunction,
|
.processFunc = lastFunction,
|
||||||
|
|
|
@ -2702,6 +2702,22 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) {
|
||||||
|
SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*) pRes;
|
||||||
|
|
||||||
|
// not initialized yet, data is required
|
||||||
|
if (pEntry == NULL) {
|
||||||
|
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
|
}
|
||||||
|
|
||||||
|
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
|
||||||
|
if (pResult->hasResult && pResult->ts >= pTimeWindow->ekey) {
|
||||||
|
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
||||||
|
} else {
|
||||||
|
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes; }
|
int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes; }
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
|
|
@ -107,7 +107,12 @@ EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow*
|
||||||
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pTimeWindow);
|
|
||||||
|
if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) {
|
||||||
|
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
|
} else {
|
||||||
|
return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pTimeWindow);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
||||||
|
|
Loading…
Reference in New Issue