Merge pull request #14041 from taosdata/feature/3_liaohj
feature(query): support interp function, and do some internal refactor.
This commit is contained in:
commit
e9b0d93907
|
@ -887,6 +887,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||||
|
|
||||||
SRequestObj *pRequest = res;
|
SRequestObj *pRequest = res;
|
||||||
pRequest->body.fetchFp = fp;
|
pRequest->body.fetchFp = fp;
|
||||||
|
pRequest->body.param = param;
|
||||||
|
|
||||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||||
if (taos_num_fields(pRequest) == 0) {
|
if (taos_num_fields(pRequest) == 0) {
|
||||||
|
|
|
@ -43,6 +43,26 @@ void showDB(TAOS* pConn) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void printResult(TAOS_RES* pRes) {
|
||||||
|
TAOS_ROW pRow = NULL;
|
||||||
|
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||||
|
int32_t numOfFields = taos_num_fields(pRes);
|
||||||
|
|
||||||
|
int32_t n = 0;
|
||||||
|
char str[512] = {0};
|
||||||
|
while ((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||||
|
int32_t* length = taos_fetch_lengths(pRes);
|
||||||
|
for(int32_t i = 0; i < numOfFields; ++i) {
|
||||||
|
printf("(%d):%d " , i, length[i]);
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
|
||||||
|
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||||
|
printf("%s\n", str);
|
||||||
|
memset(str, 0, sizeof(str));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void fetchCallback(void* param, void* res, int32_t numOfRow) {
|
void fetchCallback(void* param, void* res, int32_t numOfRow) {
|
||||||
#if 0
|
#if 0
|
||||||
printf("numOfRow = %d \n", numOfRow);
|
printf("numOfRow = %d \n", numOfRow);
|
||||||
|
@ -729,48 +749,31 @@ TEST(testCase, projection_query_tables) {
|
||||||
// taos_close(pConn);
|
// taos_close(pConn);
|
||||||
//}
|
//}
|
||||||
|
|
||||||
//TEST(testCase, agg_query_tables) {
|
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
|
||||||
// ASSERT_NE(pConn, nullptr);
|
|
||||||
//
|
|
||||||
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
|
||||||
// if (taos_errno(pRes) != 0) {
|
|
||||||
// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
|
|
||||||
// taos_free_result(pRes);
|
|
||||||
// ASSERT_TRUE(false);
|
|
||||||
// }
|
|
||||||
// taos_free_result(pRes);
|
|
||||||
//
|
|
||||||
// pRes = taos_query(pConn, "show stables");
|
|
||||||
// if (taos_errno(pRes) != 0) {
|
|
||||||
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
|
||||||
// taos_free_result(pRes);
|
|
||||||
// ASSERT_TRUE(false);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// TAOS_ROW pRow = NULL;
|
|
||||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
|
||||||
// int32_t numOfFields = taos_num_fields(pRes);
|
|
||||||
//
|
|
||||||
// int32_t n = 0;
|
|
||||||
// char str[512] = {0};
|
|
||||||
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
|
|
||||||
// int32_t* length = taos_fetch_lengths(pRes);
|
|
||||||
// for(int32_t i = 0; i < numOfFields; ++i) {
|
|
||||||
// printf("(%d):%d " , i, length[i]);
|
|
||||||
// }
|
|
||||||
// printf("\n");
|
|
||||||
//
|
|
||||||
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
|
||||||
// printf("%s\n", str);
|
|
||||||
// memset(str, 0, sizeof(str));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// taos_free_result(pRes);
|
|
||||||
// taos_close(pConn);
|
|
||||||
//}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
|
TEST(testCase, agg_query_tables) {
|
||||||
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
|
||||||
|
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to use db, reason:%s\n", taos_errstr(pRes));
|
||||||
|
taos_free_result(pRes);
|
||||||
|
ASSERT_TRUE(false);
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "show table distributed st1");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
||||||
|
taos_free_result(pRes);
|
||||||
|
ASSERT_TRUE(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
printResult(pRes);
|
||||||
|
taos_free_result(pRes);
|
||||||
|
taos_close(pConn);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
/*
|
/*
|
||||||
--- copy the following script in the shell to setup the environment ---
|
--- copy the following script in the shell to setup the environment ---
|
||||||
|
|
||||||
|
@ -786,7 +789,7 @@ TEST(testCase, async_api_test) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(pConn, nullptr);
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
|
||||||
taos_query(pConn, "use table_alltype_hyperloglog");
|
taos_query(pConn, "use abc1");
|
||||||
#if 0
|
#if 0
|
||||||
TAOS_RES* pRes = taos_query(pConn, "insert into tu(ts) values('2022-02-27 12:12:61')");
|
TAOS_RES* pRes = taos_query(pConn, "insert into tu(ts) values('2022-02-27 12:12:61')");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
|
@ -812,7 +815,7 @@ TEST(testCase, async_api_test) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
taos_query_a(pConn, "select HYPERLOGLOG(q_ts) from stable_1_2 where ts between 1630000001000 and 1630100001000 interval(19d) Fill(NONE);", queryCallback, pConn);
|
taos_query_a(pConn, "select count(*) from tu", queryCallback, pConn);
|
||||||
getchar();
|
getchar();
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
|
@ -123,4 +123,6 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
|
||||||
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
||||||
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
|
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
|
||||||
|
|
||||||
|
int32_t convertFillType(int32_t mode);
|
||||||
|
|
||||||
#endif // TDENGINE_QUERYUTIL_H
|
#endif // TDENGINE_QUERYUTIL_H
|
||||||
|
|
|
@ -566,13 +566,14 @@ typedef struct SStreamSessionAggOperatorInfo {
|
||||||
} SStreamSessionAggOperatorInfo;
|
} SStreamSessionAggOperatorInfo;
|
||||||
|
|
||||||
typedef struct STimeSliceOperatorInfo {
|
typedef struct STimeSliceOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SSDataBlock* pRes;
|
||||||
STimeWindow win;
|
STimeWindow win;
|
||||||
SInterval interval;
|
SInterval interval;
|
||||||
int64_t current;
|
int64_t current;
|
||||||
SArray* pPrevRow; // SArray<SGroupValue>
|
SArray* pPrevRow; // SArray<SGroupValue>
|
||||||
SArray* pCols; // SArray<SColumn>
|
|
||||||
int32_t fillType; // fill type
|
int32_t fillType; // fill type
|
||||||
|
SColumn tsCol; // primary timestamp column
|
||||||
|
SExprSupp scalarSup; // scalar calculation
|
||||||
struct SFillColInfo* pFillColInfo; // fill column info
|
struct SFillColInfo* pFillColInfo; // fill column info
|
||||||
} STimeSliceOperatorInfo;
|
} STimeSliceOperatorInfo;
|
||||||
|
|
||||||
|
@ -670,7 +671,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
|
||||||
|
|
||||||
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
|
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
|
||||||
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
|
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
|
||||||
void initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
|
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
|
||||||
void cleanupExprSup(SExprSupp* pSup);
|
void cleanupExprSup(SExprSupp* pSup);
|
||||||
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
||||||
const char* pkey);
|
const char* pkey);
|
||||||
|
@ -757,8 +758,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
||||||
|
|
||||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, /*SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, */SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode,
|
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode,
|
||||||
SExecTaskInfo* pTaskInfo);
|
SExecTaskInfo* pTaskInfo);
|
||||||
|
|
|
@ -694,4 +694,32 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
|
||||||
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
|
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
|
||||||
taosMemoryFree(pCond->twindows);
|
taosMemoryFree(pCond->twindows);
|
||||||
taosMemoryFree(pCond->colList);
|
taosMemoryFree(pCond->colList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t convertFillType(int32_t mode) {
|
||||||
|
int32_t type = TSDB_FILL_NONE;
|
||||||
|
switch (mode) {
|
||||||
|
case FILL_MODE_PREV:
|
||||||
|
type = TSDB_FILL_PREV;
|
||||||
|
break;
|
||||||
|
case FILL_MODE_NONE:
|
||||||
|
type = TSDB_FILL_NONE;
|
||||||
|
break;
|
||||||
|
case FILL_MODE_NULL:
|
||||||
|
type = TSDB_FILL_NULL;
|
||||||
|
break;
|
||||||
|
case FILL_MODE_NEXT:
|
||||||
|
type = TSDB_FILL_NEXT;
|
||||||
|
break;
|
||||||
|
case FILL_MODE_VALUE:
|
||||||
|
type = TSDB_FILL_SET_VALUE;
|
||||||
|
break;
|
||||||
|
case FILL_MODE_LINEAR:
|
||||||
|
type = TSDB_FILL_LINEAR;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
type = TSDB_FILL_NONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
|
@ -2754,7 +2754,11 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, num, &pOperator->exprSupp.rowEntryInfoOffset);
|
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, num);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||||
|
|
||||||
if (pOperator->exprSupp.pCtx == NULL || pInfo->binfo.pRes == NULL) {
|
if (pOperator->exprSupp.pCtx == NULL || pInfo->binfo.pRes == NULL) {
|
||||||
|
@ -2762,7 +2766,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
|
code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -2783,12 +2787,9 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
||||||
|
|
||||||
pOperator->name = "SortedMerge";
|
pOperator->name = "SortedMerge";
|
||||||
// pOperator->operatorType = OP_SortedMerge;
|
// pOperator->operatorType = OP_SortedMerge;
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->exprSupp.numOfExprs = num;
|
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
|
||||||
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo,
|
||||||
|
@ -3408,7 +3409,11 @@ void cleanupAggSup(SAggSupporter* pAggSup) {
|
||||||
|
|
||||||
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
||||||
const char* pkey) {
|
const char* pkey) {
|
||||||
initExprSupp(pSup, pExprInfo, numOfCols);
|
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
|
doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
|
pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
|
||||||
|
@ -3431,12 +3436,17 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
initResultRowInfo(&pInfo->resultRowInfo);
|
initResultRowInfo(&pInfo->resultRowInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
void initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
|
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
|
||||||
pSup->pExprInfo = pExprInfo;
|
pSup->pExprInfo = pExprInfo;
|
||||||
pSup->numOfExprs = numOfExpr;
|
pSup->numOfExprs = numOfExpr;
|
||||||
if (pSup->pExprInfo != NULL) {
|
if (pSup->pExprInfo != NULL) {
|
||||||
pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
|
pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
|
||||||
|
if (pSup->pCtx == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
@ -3458,7 +3468,10 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
initBasicInfo(&pInfo->binfo, pResultBlock);
|
initBasicInfo(&pInfo->binfo, pResultBlock);
|
||||||
initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
|
code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
pInfo->groupId = INT32_MIN;
|
pInfo->groupId = INT32_MIN;
|
||||||
pOperator->name = "TableAggregate";
|
pOperator->name = "TableAggregate";
|
||||||
|
@ -3723,13 +3736,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
||||||
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr);
|
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr);
|
||||||
|
|
||||||
if (pPhyNode->pExprs != NULL) {
|
if (pPhyNode->pExprs != NULL) {
|
||||||
SExprSupp* pSup1 = &pInfo->scalarSup;
|
int32_t num = 0;
|
||||||
pSup1->pExprInfo = createExprInfo(pPhyNode->pExprs, NULL, &pSup1->numOfExprs);
|
SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num);
|
||||||
pSup1->pCtx = createSqlFunctionCtx(pSup1->pExprInfo, pSup1->numOfExprs, &pSup1->rowEntryInfoOffset);
|
int32_t code = initExprSupp(&pInfo->scalarSup, pSExpr, num);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
|
||||||
;
|
|
||||||
|
|
||||||
int32_t numOfRows = 4096;
|
int32_t numOfRows = 4096;
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
|
@ -3746,15 +3761,14 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
||||||
|
|
||||||
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
|
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
|
||||||
|
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||||
|
|
||||||
pOperator->name = "IndefinitOperator";
|
pOperator->name = "IndefinitOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
|
||||||
pOperator->exprSupp.numOfExprs = numOfExpr;
|
pOperator->exprSupp.numOfExprs = numOfExpr;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
@ -3795,34 +3809,6 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t convertFillType(int32_t mode) {
|
|
||||||
int32_t type = TSDB_FILL_NONE;
|
|
||||||
switch (mode) {
|
|
||||||
case FILL_MODE_PREV:
|
|
||||||
type = TSDB_FILL_PREV;
|
|
||||||
break;
|
|
||||||
case FILL_MODE_NONE:
|
|
||||||
type = TSDB_FILL_NONE;
|
|
||||||
break;
|
|
||||||
case FILL_MODE_NULL:
|
|
||||||
type = TSDB_FILL_NULL;
|
|
||||||
break;
|
|
||||||
case FILL_MODE_NEXT:
|
|
||||||
type = TSDB_FILL_NEXT;
|
|
||||||
break;
|
|
||||||
case FILL_MODE_VALUE:
|
|
||||||
type = TSDB_FILL_SET_VALUE;
|
|
||||||
break;
|
|
||||||
case FILL_MODE_LINEAR:
|
|
||||||
type = TSDB_FILL_LINEAR;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
type = TSDB_FILL_NONE;
|
|
||||||
}
|
|
||||||
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
|
|
||||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
|
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo) {
|
||||||
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
|
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
|
||||||
|
@ -3852,10 +3838,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->exprSupp.numOfExprs = num;
|
pOperator->exprSupp.numOfExprs = num;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
|
||||||
|
@ -4282,6 +4268,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, false, pTaskInfo);
|
pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, false, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
|
||||||
pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||||
|
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
|
||||||
|
pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -387,11 +387,12 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
||||||
pInfo->pGroupCols = pGroupColList;
|
pInfo->pGroupCols = pGroupColList;
|
||||||
pInfo->pCondition = pCondition;
|
pInfo->pCondition = pCondition;
|
||||||
|
|
||||||
pInfo->scalarSup.pExprInfo = pScalarExprInfo;
|
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
|
||||||
pInfo->scalarSup.numOfExprs = numOfScalarExpr;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pInfo->scalarSup.pCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowEntryInfoOffset);
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -718,10 +719,12 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
||||||
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
||||||
|
|
||||||
if (pPartNode->pExprs != NULL) {
|
if (pPartNode->pExprs != NULL) {
|
||||||
pInfo->scalarSup.numOfExprs = 0;
|
int32_t num = 0;
|
||||||
pInfo->scalarSup.pExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSup.numOfExprs);
|
SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
|
||||||
pInfo->scalarSup.pCtx = createSqlFunctionCtx(
|
int32_t code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num);
|
||||||
pInfo->scalarSup.pExprInfo, pInfo->scalarSup.numOfExprs, &pInfo->scalarSup.rowEntryInfoOffset);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
|
@ -686,7 +686,10 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
|
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
|
||||||
initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
|
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
pOperator->name = "DataBlockDistScanOperator";
|
pOperator->name = "DataBlockDistScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
|
||||||
|
@ -1872,7 +1875,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
|
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
|
||||||
|
|
||||||
|
|
||||||
initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
|
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
pInfo->pTableList = pTableListInfo;
|
pInfo->pTableList = pTableListInfo;
|
||||||
pInfo->pColMatchInfo = colList;
|
pInfo->pColMatchInfo = colList;
|
||||||
|
|
|
@ -1705,7 +1705,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
||||||
return (rows == 0) ? NULL : pBInfo->pRes;
|
return (rows == 0) ? NULL : pBInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock) {
|
static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
|
||||||
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
@ -1715,108 +1715,53 @@ static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock
|
||||||
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i);
|
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i);
|
||||||
|
|
||||||
pkey->isNull = false;
|
pkey->isNull = false;
|
||||||
char* val = colDataGetData(pColInfoData, i);
|
char* val = colDataGetData(pColInfoData, rowIndex);
|
||||||
memcpy(pkey->pData, val, pkey->bytes);
|
memcpy(pkey->pData, val, pkey->bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pBlock,
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
int32_t rowIndex, SSDataBlock* pResBlock) {
|
||||||
return NULL;
|
int32_t rows = pResBlock->info.rows;
|
||||||
}
|
|
||||||
|
|
||||||
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
|
// todo set the correct primary timestamp column
|
||||||
SSDataBlock* pResBlock = pSliceInfo->binfo.pRes;
|
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
|
||||||
|
|
||||||
// if (pOperator->status == OP_RES_TO_RETURN) {
|
// output the result
|
||||||
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
|
||||||
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
|
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
|
||||||
// doSetOperatorCompleted(pOperator);
|
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
|
||||||
// }
|
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
|
||||||
//
|
|
||||||
// return pResBlock;
|
|
||||||
// }
|
|
||||||
|
|
||||||
int32_t order = TSDB_ORDER_ASC;
|
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
|
||||||
SInterval* pInterval = &pSliceInfo->interval;
|
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
||||||
|
|
||||||
int32_t numOfRows = 0;
|
switch (pSliceInfo->fillType) {
|
||||||
while (1) {
|
case TSDB_FILL_NULL:
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
colDataAppendNULL(pDst, rows);
|
||||||
if (pBlock == NULL) {
|
break;
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
case TSDB_FILL_SET_VALUE: {
|
||||||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, MAIN_SCAN, true);
|
SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal;
|
||||||
|
|
||||||
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, 0);
|
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
|
||||||
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
|
float v = 0;
|
||||||
int64_t ts = *(int64_t*) colDataGetData(pTsCol, i);
|
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
|
||||||
|
colDataAppend(pDst, rows, (char*)&v, false);
|
||||||
if (ts == pSliceInfo->current) {
|
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
for(int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
double v = 0;
|
||||||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j];
|
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
|
||||||
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
|
colDataAppend(pDst, rows, (char*)&v, false);
|
||||||
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
|
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
|
||||||
|
int64_t v = 0;
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
|
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
|
||||||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlot);
|
colDataAppend(pDst, rows, (char*)&v, false);
|
||||||
|
|
||||||
char* v = colDataGetData(pSrc, i);
|
|
||||||
colDataAppend(pDst, numOfRows, v, false);
|
|
||||||
}
|
}
|
||||||
|
} break;
|
||||||
|
|
||||||
numOfRows += 1;
|
case TSDB_FILL_LINEAR:
|
||||||
|
|
||||||
pSliceInfo->current += taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
|
||||||
if (pSliceInfo->current > pSliceInfo->win.ekey) {
|
|
||||||
doSetOperatorCompleted(pOperator);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else if (ts < pSliceInfo->current) {
|
|
||||||
if (i != pBlock->info.window.ekey) {
|
|
||||||
int64_t nextTs = *(int64_t*) colDataGetData(pTsCol, i + 1);
|
|
||||||
if (nextTs > pSliceInfo->current) {
|
|
||||||
// output the result
|
|
||||||
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
|
||||||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j];
|
|
||||||
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
|
|
||||||
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
|
|
||||||
|
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
|
|
||||||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlot);
|
|
||||||
|
|
||||||
switch (pSliceInfo->fillType) {
|
|
||||||
case TSDB_FILL_NULL:
|
|
||||||
colDataAppendNULL(pDst, numOfRows);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case TSDB_FILL_SET_VALUE: {
|
|
||||||
SVariant* pVar = &pSliceInfo->pFillColInfo[i].fillVal;
|
|
||||||
|
|
||||||
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
|
|
||||||
float v = 0;
|
|
||||||
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
|
|
||||||
colDataAppend(pDst, numOfRows, (char*)&v, false);
|
|
||||||
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
|
|
||||||
double v = 0;
|
|
||||||
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
|
|
||||||
colDataAppend(pDst, numOfRows, (char*)&v, false);
|
|
||||||
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
|
|
||||||
int64_t v = 0;
|
|
||||||
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
|
|
||||||
colDataAppend(pDst, numOfRows, (char*)&v, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case TSDB_FILL_LINEAR:
|
|
||||||
#if 0
|
#if 0
|
||||||
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|
||||||
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
|
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
|
||||||
// goto interp_exit;
|
// goto interp_exit;
|
||||||
}
|
}
|
||||||
|
@ -1845,33 +1790,158 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TSDB_FILL_PREV: {
|
case TSDB_FILL_PREV: {
|
||||||
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
|
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
|
||||||
colDataAppend(pDst, numOfRows, pkey->pData, false);
|
colDataAppend(pDst, rows, pkey->pData, false);
|
||||||
} break;
|
} break;
|
||||||
|
|
||||||
case TSDB_FILL_NEXT: {
|
case TSDB_FILL_NEXT: {
|
||||||
} break;
|
char* p = colDataGetData(pSrc, rowIndex);
|
||||||
|
colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex));
|
||||||
|
} break;
|
||||||
|
|
||||||
case TSDB_FILL_NONE:
|
case TSDB_FILL_NONE:
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pSliceInfo->current +=
|
pResBlock->info.rows += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
|
if (pInfo->pPrevRow != NULL) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys));
|
||||||
|
if (pInfo->pPrevRow == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfCols = pBlock->info.numOfCols;
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
|
||||||
|
SGroupKeys key = {0};
|
||||||
|
key.bytes = pColInfo->info.bytes;
|
||||||
|
key.type = pColInfo->info.type;
|
||||||
|
key.isNull = false;
|
||||||
|
key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
|
||||||
|
taosArrayPush(pInfo->pPrevRow, &key);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
|
||||||
|
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
||||||
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
// if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
|
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
||||||
|
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
|
||||||
|
// doSetOperatorCompleted(pOperator);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return pResBlock;
|
||||||
|
// }
|
||||||
|
|
||||||
|
int32_t order = TSDB_ORDER_ASC;
|
||||||
|
SInterval* pInterval = &pSliceInfo->interval;
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
while (1) {
|
||||||
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
|
if (pBlock == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = initPrevRowsKeeper(pSliceInfo, pBlock);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
|
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
|
|
||||||
|
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
|
||||||
|
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
|
int64_t ts = *(int64_t*) colDataGetData(pTsCol, i);
|
||||||
|
|
||||||
|
if (ts == pSliceInfo->current) {
|
||||||
|
for(int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
||||||
|
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j];
|
||||||
|
int32_t dstSlot = pExprInfo->base.resSchema.slotId;
|
||||||
|
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
|
||||||
|
|
||||||
|
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot);
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
|
||||||
|
|
||||||
|
char* v = colDataGetData(pSrc, i);
|
||||||
|
colDataAppend(pDst, numOfRows, v, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
pResBlock->info.rows += 1;
|
||||||
|
doKeepPrevRows(pSliceInfo, pBlock, i);
|
||||||
|
|
||||||
|
pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||||
|
if (pSliceInfo->current > pSliceInfo->win.ekey) {
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pResBlock->info.rows >= pResBlock->info.capacity) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else if (ts < pSliceInfo->current) {
|
||||||
|
if (i < pBlock->info.rows - 1) {
|
||||||
|
int64_t nextTs = *(int64_t*) colDataGetData(pTsCol, i + 1);
|
||||||
|
if (nextTs > pSliceInfo->current) {
|
||||||
|
while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
|
||||||
|
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, i, pResBlock);
|
||||||
|
pSliceInfo->current =
|
||||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||||
if (pSliceInfo->current > pSliceInfo->win.ekey) {
|
if (pResBlock->info.rows >= pResBlock->info.capacity) {
|
||||||
doSetOperatorCompleted(pOperator);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pSliceInfo->current > pSliceInfo->win.ekey) {
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
break;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// ignore current row, and do nothing
|
// ignore current row, and do nothing
|
||||||
}
|
}
|
||||||
} else { // it is the last row of current block
|
} else { // it is the last row of current block
|
||||||
doKeepPrevRows(pSliceInfo, pBlock);
|
doKeepPrevRows(pSliceInfo, pBlock, i);
|
||||||
|
}
|
||||||
|
} else { // ts > pSliceInfo->current
|
||||||
|
while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
|
||||||
|
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pBlock, i, pResBlock);
|
||||||
|
pSliceInfo->current =
|
||||||
|
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||||
|
if (pResBlock->info.rows >= pResBlock->info.capacity) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pSliceInfo->current > pSliceInfo->win.ekey) {
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1886,59 +1956,46 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
return pResBlock->info.rows == 0 ? NULL : pResBlock;
|
return pResBlock->info.rows == 0 ? NULL : pResBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t initTimesliceInfo(STimeSliceOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfCols) {
|
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pPhyNode, SExecTaskInfo* pTaskInfo) {
|
||||||
pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys));
|
|
||||||
pInfo->pCols = taosArrayInit(4, sizeof(SColumn));
|
|
||||||
|
|
||||||
if (pInfo->pPrevRow == NULL || pInfo->pCols == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
SExprInfo* pExpr = pCtx[i].pExpr;
|
|
||||||
|
|
||||||
SFunctParam* pParam = &pExpr->base.pParam[0];
|
|
||||||
|
|
||||||
SColumn c = *pParam->pCol;
|
|
||||||
taosArrayPush(pInfo->pCols, &c);
|
|
||||||
|
|
||||||
SGroupKeys key = {0};
|
|
||||||
key.bytes = c.bytes;
|
|
||||||
key.type = c.type;
|
|
||||||
key.isNull = false;
|
|
||||||
key.pData = taosMemoryCalloc(1, c.bytes);
|
|
||||||
taosArrayPush(pInfo->pPrevRow, &key);
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
|
||||||
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo) {
|
|
||||||
STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
|
STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pOperator == NULL || pInfo == NULL) {
|
if (pOperator == NULL || pInfo == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SInterpFuncPhysiNode* pInterpPhyNode = (SInterpFuncPhysiNode*)pPhyNode;
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
|
||||||
int32_t code = initTimesliceInfo(pInfo, pSup->pCtx, numOfCols);
|
int32_t numOfExprs = 0;
|
||||||
|
SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs);
|
||||||
|
int32_t code = initExprSupp(pSup, pExprInfo, numOfExprs);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
if (pInterpPhyNode->pExprs != NULL) {
|
||||||
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfCols, pValNode);
|
int32_t num = 0;
|
||||||
|
SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num);
|
||||||
|
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pInfo->binfo.pRes = pResultBlock;
|
pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
|
||||||
|
pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
|
||||||
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
|
||||||
|
pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues);
|
||||||
|
pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
|
pInfo->win = pInterpPhyNode->timeRange;
|
||||||
|
pInfo->interval.interval = pInterpPhyNode->interval;
|
||||||
|
pInfo->current = pInfo->win.skey;
|
||||||
|
|
||||||
pOperator->name = "TimeSliceOperator";
|
pOperator->name = "TimeSliceOperator";
|
||||||
// pOperator->operatorType = OP_AllTimeWindow;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC;
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
@ -2433,7 +2490,11 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) {
|
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) {
|
||||||
initExprSupp(pSup, pExprInfo, numOfCols);
|
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
initBasicInfo(pBasicInfo, pResultBlock);
|
initBasicInfo(pBasicInfo, pResultBlock);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
|
Loading…
Reference in New Issue