Merge pull request #12163 from taosdata/feature/3.0_liaohj

fix(query): sort according to the generated column data in order by operator.
This commit is contained in:
Haojun Liao 2022-05-06 18:10:14 +08:00 committed by GitHub
commit 29651b7b8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 228 additions and 153 deletions

View File

@ -91,6 +91,8 @@ int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -587,15 +587,34 @@ TEST(testCase, projection_query_tables) {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tu using st1 tags(1)"); pRes = taos_query(pConn, "create table tu using st1 tags(1)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
} }
taos_free_result(pRes); taos_free_result(pRes);
for(int32_t i = 0; i < 10000; ++i) { pRes = taos_query(pConn, "create table tu2 using st2 tags(1)");
char sql[512] = {0}; if (taos_errno(pRes) != 0) {
sprintf(sql, "insert into tu values(now+%da, %d)", i, i); printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
for(int32_t i = 0; i < 10000000; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)",
i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7,
i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14,
i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
TAOS_RES* p = taos_query(pConn, sql); TAOS_RES* p = taos_query(pConn, sql);
if (taos_errno(p) != 0) { if (taos_errno(p) != 0) {
printf("failed to insert data, reason:%s\n", taos_errstr(p)); printf("failed to insert data, reason:%s\n", taos_errstr(p));
@ -604,24 +623,44 @@ TEST(testCase, projection_query_tables) {
taos_free_result(p); taos_free_result(p);
} }
pRes = taos_query(pConn, "select * from tu"); printf("start to insert next table\n");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); for(int32_t i = 0; i < 10000000; i += 20) {
taos_free_result(pRes); char sql[1024] = {0};
ASSERT_TRUE(false); sprintf(sql,
"insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)",
i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7,
i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14,
i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
TAOS_RES* p = taos_query(pConn, sql);
if (taos_errno(p) != 0) {
printf("failed to insert data, reason:%s\n", taos_errstr(p));
} }
TAOS_ROW pRow = NULL; taos_free_result(p);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
} }
taos_free_result(pRes); // pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
// taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
@ -659,7 +698,7 @@ TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use db"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to use db, reason:%s\n", taos_errstr(pRes)); printf("failed to use db, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);

View File

@ -363,9 +363,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd
for (int32_t i = 0; i < pDest->info.numOfCols; ++i) { for (int32_t i = 0; i < pDest->info.numOfCols; ++i) {
int32_t mapIndex = i; int32_t mapIndex = i;
if (pIndexMap) { // if (pIndexMap) {
mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i); // mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
} // }
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex);
@ -493,12 +493,12 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
bool isNull = false; bool isNull = false;
if (pBlock->pBlockAgg == NULL) { if (pBlock->pBlockAgg == NULL) {
isNull = colDataIsNull(pColData, pBlock->info.rows, j, NULL); isNull = colDataIsNull_s(pColData, pBlock->info.rows);
} else { } else {
isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]); isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]);
} }
char* p = colDataGetData(pColData, j);
char* p = colDataGetData(pColData, j);
colDataAppend(pDstCol, j - startIndex, p, isNull); colDataAppend(pDstCol, j - startIndex, p, isNull);
} }
} }

View File

@ -87,9 +87,7 @@ typedef struct SResultInfo { // TODO refactor
typedef struct STableQueryInfo { typedef struct STableQueryInfo {
TSKEY lastKey; // last check ts, todo remove it later TSKEY lastKey; // last check ts, todo remove it later
SResultRowPosition pos; // current active time window SResultRowPosition pos; // current active time window
// int32_t groupIndex; // group id in table list
// SVariant tag; // SVariant tag;
// SResultRowInfo resInfo; // result info
} STableQueryInfo; } STableQueryInfo;
typedef enum { typedef enum {
@ -365,9 +363,10 @@ typedef struct STableScanInfo {
typedef struct STagScanInfo { typedef struct STagScanInfo {
SColumnInfo *pCols; SColumnInfo *pCols;
SSDataBlock *pRes; SSDataBlock *pRes;
int32_t totalTables; SArray *pColMatchInfo;
int32_t curPos; int32_t curPos;
void *pReader; SReadHandle readHandle;
STableGroupInfo *pTableGroups;
} STagScanInfo; } STagScanInfo;
typedef struct SStreamBlockScanInfo { typedef struct SStreamBlockScanInfo {
@ -579,9 +578,8 @@ typedef struct SSortOperatorInfo {
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo; SArray* pSortInfo;
SSortHandle* pSortHandle; SSortHandle* pSortHandle;
SArray* inputSlotMap; // for index map from table scan output SArray* pColMatchInfo; // for index map from table scan output
int32_t bufPageSize; int32_t bufPageSize;
// int32_t numOfRowsInRes;
// TODO extact struct // TODO extact struct
int64_t startTs; // sort start time int64_t startTs; // sort start time
@ -646,7 +644,7 @@ void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity); SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo);
SSDataBlock* loadNextDataBlock(void* param); SSDataBlock* loadNextDataBlock(void* param);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
@ -704,7 +702,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
#if 0 #if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
@ -717,7 +715,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol);
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win); STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win);

View File

@ -117,18 +117,25 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle);
/** /**
* *
* @param pHandle * @param pHandle
* @param colIndex * @param colId
* @return * @return
*/ */
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex); bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId);
/** /**
* *
* @param pHandle * @param pHandle
* @param colIndex * @param colId
* @return * @return
*/ */
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex); void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
/**
*
* @param pSortHandle
* @return
*/
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -3520,7 +3520,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortedMergeOperatorInfo* pInfo = pOperator->info; SSortedMergeOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity); return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL);
} }
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
@ -4701,7 +4701,7 @@ static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols); static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
static SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget); static SArray* createSortInfo(SNodeList* pNodeList);
static SArray* createIndexMap(SNodeList* pNodeList); static SArray* createIndexMap(SNodeList* pNodeList);
static SArray* extractPartitionColInfo(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList);
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
@ -4739,7 +4739,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
SQueryTableDataCond cond = {0}; SQueryTableDataCond cond = {0};
int32_t code = initQueryTableDataCond(&cond, pTableScanNode); int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
@ -4783,6 +4782,25 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList, pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId); pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
return pOperator; return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*) pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
int32_t code =
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num);
int32_t numOfOutputCols = 0;
SArray* colList =
extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfOutputCols);
SOperatorInfo* pOperator = createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
return pOperator;
} else { } else {
ASSERT(0); ASSERT(0);
} }
@ -4852,16 +4870,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets); SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
SArray* slotMap = createIndexMap(pSortPhyNode->pTargets);
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = NULL; SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
if (pSortPhyNode->pExprs != NULL) {
pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
}
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, slotMap, pTaskInfo); int32_t numOfOutputCols = 0;
SArray* pColList =
extractColMatchInfo(pSortPhyNode->pTargets, pSortPhyNode->node.pOutputDataBlockDesc, &numOfOutputCols);
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
@ -5019,7 +5037,7 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList) {
return pList; return pList;
} }
SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget) { SArray* createSortInfo(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList); size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo)); SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
if (pList == NULL) { if (pList == NULL) {
@ -5034,22 +5052,7 @@ SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget) {
bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST); bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr; SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
bi.slotId = pColNode->slotId;
bool found = false;
for (int32_t j = 0; j < LIST_LENGTH(pNodeListTarget); ++j) {
STargetNode* pTarget = (STargetNode*)nodesListGetNode(pNodeListTarget, j);
SColumnNode* pColNodeT = (SColumnNode*)pTarget->pExpr;
if (pColNode->slotId == pColNodeT->slotId) { // to find slotId in PhysiSort OutputDataBlockDesc
bi.slotId = pTarget->slotId;
found = true;
break;
}
}
if (!found) {
qError("sort slot id does not found");
}
taosArrayPush(pList, &bi); taosArrayPush(pList, &bi);
} }
@ -5166,9 +5169,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
#if 0
return tsdbQueryTables(pHandle->reader, &cond, pTableGroupInfo, queryId, taskId);
#endif
return tsdbQueryTables(pHandle->vnode, &cond, pTableGroupInfo, queryId, taskId); return tsdbQueryTables(pHandle->vnode, &cond, pTableGroupInfo, queryId, taskId);
_error: _error:

View File

@ -13,15 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "ttime.h" #include <libs/function/function.h>
#include "filter.h" #include "filter.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "os.h" #include "os.h"
#include "querynodes.h" #include "querynodes.h"
#include "systable.h"
#include "tglobal.h" #include "tglobal.h"
#include "tname.h" #include "tname.h"
#include "systable.h" #include "ttime.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tmsg.h" #include "tmsg.h"
@ -1159,16 +1160,17 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
} }
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
#if 0
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
#if 0
int32_t maxNumOfTables = (int32_t)pResultInfo->capacity; int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
STagScanInfo *pInfo = pOperator->info; STagScanInfo *pInfo = pOperator->info;
SSDataBlock *pRes = pInfo->pRes; SSDataBlock *pRes = pInfo->pRes;
*newgroup = false;
int32_t count = 0; int32_t count = 0;
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
@ -1237,55 +1239,54 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
//qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count); //qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
} else { // return only the tags|table name etc. } else { // return only the tags|table name etc.
SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo #endif
count = 0; STagScanInfo* pInfo = pOperator->info;
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) { SExprInfo* pExprInfo = &pOperator->pExpr[0];
int32_t i = pInfo->curPos++; SSDataBlock* pRes = pInfo->pRes;
STableQueryInfo* item = taosArrayGetP(pa, i); SArray* pa = taosArrayGetP(pInfo->pTableGroups->pGroupList, 0);
char str[512] = {0};
int32_t count = 0;
SMetaReader mr = {0};
while (pInfo->curPos < pInfo->pTableGroups->numOfTables && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = taosArrayGet(pa, pInfo->curPos);
char *data = NULL, *dst = NULL;
int16_t type = 0, bytes = 0;
for (int32_t j = 0; j < pOperator->numOfExprs; ++j) { for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
// not assign value in case of user defined constant output column SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) {
continue;
}
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j); // refactor later
type = pExprInfo[j].base.resSchema.type; if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
bytes = pExprInfo[j].base.resSchema.bytes; metaReaderInit(&mr, pInfo->readHandle.meta, 0);
metaGetTableEntryByUid(&mr, item->uid);
if (pExprInfo[j].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) { STR_TO_VARSTR(str, mr.me.name);
data = tsdbGetTableName(item->pTable); metaReaderClear(&mr);
} else {
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
}
dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes; colDataAppend(pDst, count, str, false);
doSetTagValueToResultBuf(dst, data, type, bytes);
// data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
// dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
// doSetTagValueToResultBuf(dst, data, type, bytes);
} }
count += 1; count += 1;
} }
if (pInfo->curPos >= pInfo->totalTables) { if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
}
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count); // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
}
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
setTaskStatus(pOperator->pRuntimeEnv, TASK_COMPLETED); setTaskStatus(pTaskInfo, TASK_COMPLETED);
} }
pRes->info.rows = count; pRes->info.rows = count;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
#endif
return TSDB_CODE_SUCCESS;
} }
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
@ -1293,14 +1294,18 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
} }
SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
pInfo->pReader = readHandle; pInfo->pTableGroups = pTableGroupInfo;
pInfo->pColMatchInfo = pColMatchInfo;
pInfo->pRes = pResBlock;
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0; pInfo->curPos = 0;
pOperator->name = "TagScanOperator"; pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
@ -1311,6 +1316,9 @@ SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int
pOperator->numOfExprs = numOfOutput; pOperator->numOfExprs = numOfOutput;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);

View File

@ -5,7 +5,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
SArray* pIndexMap, SExecTaskInfo* pTaskInfo) { SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t rowSize = pResBlock->info.rowSize; int32_t rowSize = pResBlock->info.rowSize;
@ -20,17 +20,19 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header
pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer
pInfo->pSortInfo = pSortInfo; pInfo->pSortInfo = pSortInfo;
pInfo->inputSlotMap = pIndexMap; pInfo->pColMatchInfo= pColMatchColInfo;
pOperator->name = "SortOperator"; pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
// lazy evaluation for the following parameter since the input datablock is not known till now.
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header
// pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL);
@ -45,14 +47,12 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
return NULL; return NULL;
} }
// TODO merge aggregate super table
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
bool isNull = tsortIsNullVal(pTupleHandle, i); bool isNull = tsortIsNullVal(pTupleHandle, i);
if (isNull) { if (isNull) {
colDataAppend(pColInfo, pBlock->info.rows, NULL, true); colDataAppendNULL(pColInfo, pBlock->info.rows);
} else { } else {
char* pData = tsortGetValue(pTupleHandle, i); char* pData = tsortGetValue(pTupleHandle, i);
colDataAppend(pColInfo, pBlock->info.rows, pData, false); colDataAppend(pColInfo, pBlock->info.rows, pData, false);
@ -62,11 +62,12 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
pBlock->info.rows += 1; pBlock->info.rows += 1;
} }
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) { SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo) {
blockDataCleanup(pDataBlock); blockDataCleanup(pDataBlock);
blockDataEnsureCapacity(pDataBlock, capacity); ASSERT(taosArrayGetSize(pColMatchInfo) == pDataBlock->info.numOfCols);
blockDataEnsureCapacity(pDataBlock, capacity); SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
blockDataEnsureCapacity(p, capacity);
while (1) { while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle); STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
@ -74,12 +75,32 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
break; break;
} }
appendOneRowToDataBlock(pDataBlock, pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
if (pDataBlock->info.rows >= capacity) { if (p->info.rows >= capacity) {
return pDataBlock; return pDataBlock;
} }
} }
if (p->info.rows > 0) {
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
for(int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
for(int32_t j = 0; j < p->info.numOfCols; ++j) {
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, j);
if (pSrc->info.colId == pmInfo->colId) {
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
colDataAssign(pDst, pSrc, p->info.rows);
break;
}
}
}
pDataBlock->info.rows = p->info.rows;
pDataBlock->info.capacity = p->info.rows;
}
blockDataDestroy(p);
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
} }
@ -106,16 +127,16 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
SSortOperatorInfo* pInfo = pOperator->info; SSortOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity); return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
} }
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; // pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT, // int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->bufPageSize, numOfBufPage, pInfo->binfo.pRes, pTaskInfo->id.str); pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT,
-1, -1, NULL, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[0]; ps->param = pOperator->pDownstream[0];
tsortAddSource(pInfo->pSortHandle, ps); tsortAddSource(pInfo->pSortHandle, ps);
@ -127,7 +148,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity); return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
} }
void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
@ -135,5 +156,5 @@ void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->inputSlotMap); taosArrayDestroy(pInfo->pColMatchInfo);
} }

View File

@ -64,25 +64,8 @@ struct SSortHandle {
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param); static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);
static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) { SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); return createOneDataBlock(pSortHandle->pDataBlock, false);
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pBlock->info.numOfCols = numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0};
colInfo.info.type = pSchema[i].type;
colInfo.info.bytes = pSchema[i].bytes;
colInfo.info.colId = pSchema[i].colId;
taosArrayPush(pBlock->pDataBlock, &colInfo);
if (IS_VAR_DATA_TYPE(colInfo.info.type)) {
pBlock->info.hasVarCol = true;
}
}
return pBlock;
} }
/** /**
@ -98,7 +81,10 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t
pSortHandle->numOfPages = numOfPages; pSortHandle->numOfPages = numOfPages;
pSortHandle->pSortInfo = pSortInfo; pSortHandle->pSortInfo = pSortInfo;
pSortHandle->pIndexMap = pIndexMap; pSortHandle->pIndexMap = pIndexMap;
if (pBlock != NULL) {
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false); pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
}
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
pSortHandle->cmpParam.orderInfo = pSortInfo; pSortHandle->cmpParam.orderInfo = pSortInfo;
@ -530,6 +516,17 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
if (pHandle->pDataBlock == NULL) { if (pHandle->pDataBlock == NULL) {
pHandle->pDataBlock = createOneDataBlock(pBlock, false); pHandle->pDataBlock = createOneDataBlock(pBlock, false);
// calculate the buffer pages according to the total available buffers.
int32_t rowSize = blockDataGetRowSize(pBlock);
if (rowSize * 4 > 4096) {
pHandle->pageSize = rowSize * 4;
} else {
pHandle->pageSize = 4096;
}
// todo!!
pHandle->numOfPages = 1024;
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
} }
// perform the scalar function calculation before apply the sort // perform the scalar function calculation before apply the sort
@ -538,7 +535,6 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
} }
// todo relocate the columns // todo relocate the columns
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap); int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap);
if (code != 0) { if (code != 0) {
return code; return code;
@ -689,7 +685,7 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) { bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex); SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
return colDataIsNull(pColInfoSrc, 0, pVHandle->rowIndex, NULL); return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex);
} }
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) { void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {

View File

@ -917,7 +917,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateTbnameColumn, .translateFunc = translateTbnameColumn,
.getEnvFunc = NULL, .getEnvFunc = NULL,
.initFunc = NULL, .initFunc = NULL,
.sprocessFunc = NULL, .sprocessFunc = qTbnameFunction,
.finalizeFunc = NULL .finalizeFunc = NULL
}, },
{ {

View File

@ -1513,3 +1513,9 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 4)); colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 4));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1);
colDataAppend(pOutput->columnData, pOutput->numOfRows, colDataGetData(pInput->columnData, 0), false);
return TSDB_CODE_SUCCESS;
}

View File

@ -1023,8 +1023,7 @@ static void vectorMathMultiplyHelper(SColumnInfoData* pLeftCol, SColumnInfoData*
colDataAppendNULL(pOutputCol, i); colDataAppendNULL(pOutputCol, i);
continue; // TODO set null or ignore continue; // TODO set null or ignore
} }
*output = getVectorDoubleValueFnLeft(LEFT_COL, i) *output = getVectorDoubleValueFnLeft(LEFT_COL, i) * getVectorDoubleValueFnRight(RIGHT_COL, 0);
* getVectorDoubleValueFnRight(RIGHT_COL, 0);
} }
} }
} }
@ -1050,8 +1049,7 @@ void vectorMathMultiply(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam
colDataAppendNULL(pOutputCol, i); colDataAppendNULL(pOutputCol, i);
continue; // TODO set null or ignore continue; // TODO set null or ignore
} }
*output = getVectorDoubleValueFnLeft(LEFT_COL, i) *output = getVectorDoubleValueFnLeft(LEFT_COL, i) * getVectorDoubleValueFnRight(RIGHT_COL, i);
* getVectorDoubleValueFnRight(RIGHT_COL, i);
} }
} else if (pLeft->numOfRows == 1) { } else if (pLeft->numOfRows == 1) {
vectorMathMultiplyHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, i); vectorMathMultiplyHelper(pRightCol, pLeftCol, pOutputCol, pRight->numOfRows, step, i);