fix: fix some syntax errors.
This commit is contained in:
parent
5eed1159b1
commit
199a3610f7
|
@ -1,14 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
|
@ -347,10 +347,11 @@ typedef struct STableScanInfo {
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
typedef struct STagScanInfo {
|
typedef struct STagScanInfo {
|
||||||
SColumnInfo* pCols;
|
SColumnInfo *pCols;
|
||||||
SSDataBlock* pRes;
|
SSDataBlock *pRes;
|
||||||
int32_t totalTables;
|
int32_t totalTables;
|
||||||
int32_t curPos;
|
int32_t curPos;
|
||||||
|
void *pReader;
|
||||||
} STagScanInfo;
|
} STagScanInfo;
|
||||||
|
|
||||||
typedef struct SStreamBlockScanInfo {
|
typedef struct SStreamBlockScanInfo {
|
||||||
|
@ -376,13 +377,11 @@ typedef struct SSysTableScanInfo {
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
tsem_t ready;
|
tsem_t ready;
|
||||||
|
|
||||||
int32_t accountId;
|
int32_t accountId;
|
||||||
bool showRewrite;
|
bool showRewrite;
|
||||||
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
|
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
|
||||||
void* pCur; // cursor for iterate the local table meta store.
|
void* pCur; // cursor for iterate the local table meta store.
|
||||||
SArray* scanCols; // SArray<int16_t> scan column id list
|
SArray* scanCols; // SArray<int16_t> scan column id list
|
||||||
|
|
||||||
// int32_t type; // show type, TODO remove it
|
|
||||||
SName name;
|
SName name;
|
||||||
SSDataBlock* pRes;
|
SSDataBlock* pRes;
|
||||||
int32_t capacity;
|
int32_t capacity;
|
||||||
|
@ -628,7 +627,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
||||||
|
|
||||||
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime,
|
SOperatorInfo* createTableScanOperatorInfo(void* pReaderHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime,
|
||||||
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition,
|
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition,
|
||||||
SInterval* pInterval, double ratio, SExecTaskInfo* pTaskInfo);
|
SInterval* pInterval, double ratio, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
||||||
|
@ -668,12 +667,12 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
|
||||||
|
SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||||
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
||||||
SExprInfo* pExpr, int32_t numOfOutput);
|
SExprInfo* pExpr, int32_t numOfOutput);
|
||||||
SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput);
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
||||||
|
|
|
@ -235,7 +235,6 @@ static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDi
|
||||||
|
|
||||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||||
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
|
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
|
||||||
static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
|
|
||||||
static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId,
|
static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId,
|
||||||
SExecTaskInfo* pTaskInfo);
|
SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
|
@ -298,26 +297,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isSelectivityWithTagsQuery(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|
||||||
return true;
|
|
||||||
// bool hasTags = false;
|
|
||||||
// int32_t numOfSelectivity = 0;
|
|
||||||
//
|
|
||||||
// for (int32_t i = 0; i < numOfOutput; ++i) {
|
|
||||||
// int32_t functId = pCtx[i].functionId;
|
|
||||||
// if (functId == FUNCTION_TAG_DUMMY || functId == FUNCTION_TS_DUMMY) {
|
|
||||||
// hasTags = true;
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if ((aAggs[functId].status & FUNCSTATE_SELECTIVITY) != 0) {
|
|
||||||
// numOfSelectivity++;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return (numOfSelectivity > 0 && hasTags);
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
|
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
|
||||||
if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
|
if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
|
||||||
pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
|
@ -1858,10 +1837,6 @@ void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock*
|
||||||
|
|
||||||
// set the output buffer for the selectivity + tag query
|
// set the output buffer for the selectivity + tag query
|
||||||
static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
if (!isSelectivityWithTagsQuery(pCtx, numOfOutput)) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
int16_t tagLen = 0;
|
int16_t tagLen = 0;
|
||||||
|
|
||||||
|
@ -2111,25 +2086,6 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doUpdateLastKey(STaskAttr* pQueryAttr) {
|
|
||||||
STimeWindow* win = &pQueryAttr->window;
|
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(pQueryAttr->tableGroupInfo.pGroupList);
|
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
|
||||||
SArray* p1 = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);
|
|
||||||
|
|
||||||
size_t len = taosArrayGetSize(p1);
|
|
||||||
for (int32_t j = 0; j < len; ++j) {
|
|
||||||
// STableKeyInfo* pInfo = taosArrayGet(p1, j);
|
|
||||||
//
|
|
||||||
// // update the new lastkey if it is equalled to the value of the old skey
|
|
||||||
// if (pInfo->lastKey == win->ekey) {
|
|
||||||
// pInfo->lastKey = win->skey;
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
|
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
|
||||||
// STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
|
// STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
|
||||||
//
|
//
|
||||||
|
@ -3075,33 +3031,6 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
|
||||||
pAggInfo->groupId = groupId;
|
pAggInfo->groupId = groupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) {
|
|
||||||
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
|
||||||
|
|
||||||
SExprBasicInfo* pExpr = &pExprInfo->base;
|
|
||||||
// if (pQueryAttr->stableQuery && (pRuntimeEnv->pTsBuf != NULL) &&
|
|
||||||
// (pExpr->functionId == FUNCTION_TS || pExpr->functionId == FUNCTION_PRJ) &&
|
|
||||||
// (pExpr->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_ID)) {
|
|
||||||
// assert(pExpr->numOfParams == 1);
|
|
||||||
//
|
|
||||||
// int16_t tagColId = (int16_t)pExprInfo->base.param[0].i;
|
|
||||||
// SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagColId);
|
|
||||||
//
|
|
||||||
// doSetTagValueInParam(pTable, tagColId, &pCtx->tag, pColInfo->type, pColInfo->bytes);
|
|
||||||
//
|
|
||||||
// int16_t tagType = pCtx[0].tag.nType;
|
|
||||||
// if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
// //qDebug("QInfo:0x%"PRIx64" set tag value for join comparison, colId:%" PRId64 ", val:%s",
|
|
||||||
// GET_TASKID(pRuntimeEnv),
|
|
||||||
//// pExprInfo->base.param[0].i, pCtx[0].tag.pz);
|
|
||||||
// } else {
|
|
||||||
// //qDebug("QInfo:0x%"PRIx64" set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64,
|
|
||||||
// GET_TASKID(pRuntimeEnv),
|
|
||||||
//// pExprInfo->base.param[0].i, pCtx[0].tag.i);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* There are two cases to handle:
|
* There are two cases to handle:
|
||||||
*
|
*
|
||||||
|
@ -5863,11 +5792,6 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|
||||||
STagScanInfo* pInfo = (STagScanInfo*)param;
|
|
||||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
|
SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
|
||||||
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
|
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
|
||||||
|
@ -6221,157 +6145,6 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
|
|
||||||
#if 0
|
|
||||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
|
|
||||||
|
|
||||||
STagScanInfo *pInfo = pOperator->info;
|
|
||||||
SSDataBlock *pRes = pInfo->pRes;
|
|
||||||
*newgroup = false;
|
|
||||||
|
|
||||||
int32_t count = 0;
|
|
||||||
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
|
|
||||||
|
|
||||||
int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]);
|
|
||||||
if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
|
|
||||||
assert(pQueryAttr->numOfOutput == 1);
|
|
||||||
|
|
||||||
SExprInfo* pExprInfo = &pOperator->pExpr[0];
|
|
||||||
int32_t rsize = pExprInfo->base.resSchema.bytes;
|
|
||||||
|
|
||||||
count = 0;
|
|
||||||
|
|
||||||
int16_t bytes = pExprInfo->base.resSchema.bytes;
|
|
||||||
int16_t type = pExprInfo->base.resSchema.type;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
|
|
||||||
if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
|
|
||||||
bytes = pQueryAttr->tagColList[i].bytes;
|
|
||||||
type = pQueryAttr->tagColList[i].type;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
|
|
||||||
|
|
||||||
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
|
||||||
int32_t i = pInfo->curPos++;
|
|
||||||
STableQueryInfo *item = taosArrayGetP(pa, i);
|
|
||||||
|
|
||||||
char *output = pColInfo->pData + count * rsize;
|
|
||||||
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
|
|
||||||
|
|
||||||
output = varDataVal(output);
|
|
||||||
STableId* id = TSDB_TABLEID(item->pTable);
|
|
||||||
|
|
||||||
*(int16_t *)output = 0;
|
|
||||||
output += sizeof(int16_t);
|
|
||||||
|
|
||||||
*(int64_t *)output = id->uid; // memory align problem, todo serialize
|
|
||||||
output += sizeof(id->uid);
|
|
||||||
|
|
||||||
*(int32_t *)output = id->tid;
|
|
||||||
output += sizeof(id->tid);
|
|
||||||
|
|
||||||
*(int32_t *)output = pQueryAttr->vgId;
|
|
||||||
output += sizeof(pQueryAttr->vgId);
|
|
||||||
|
|
||||||
char* data = NULL;
|
|
||||||
if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
|
||||||
data = tsdbGetTableName(item->pTable);
|
|
||||||
} else {
|
|
||||||
data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
doSetTagValueToResultBuf(output, data, type, bytes);
|
|
||||||
count += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
//qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
|
||||||
} else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
|
|
||||||
*(int64_t*)pColInfo->pData = pInfo->totalTables;
|
|
||||||
count = 1;
|
|
||||||
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
|
||||||
//qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
|
|
||||||
} else { // return only the tags|table name etc.
|
|
||||||
SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo
|
|
||||||
|
|
||||||
count = 0;
|
|
||||||
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
|
||||||
int32_t i = pInfo->curPos++;
|
|
||||||
|
|
||||||
STableQueryInfo* item = taosArrayGetP(pa, i);
|
|
||||||
|
|
||||||
char *data = NULL, *dst = NULL;
|
|
||||||
int16_t type = 0, bytes = 0;
|
|
||||||
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
|
|
||||||
// not assign value in case of user defined constant output column
|
|
||||||
if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j);
|
|
||||||
type = pExprInfo[j].base.resSchema.type;
|
|
||||||
bytes = pExprInfo[j].base.resSchema.bytes;
|
|
||||||
|
|
||||||
if (pExprInfo[j].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
|
||||||
data = tsdbGetTableName(item->pTable);
|
|
||||||
} else {
|
|
||||||
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
|
|
||||||
doSetTagValueToResultBuf(dst, data, type, bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
count += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pInfo->curPos >= pInfo->totalTables) {
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
|
||||||
}
|
|
||||||
|
|
||||||
//qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
|
||||||
setTaskStatus(pOperator->pRuntimeEnv, TASK_COMPLETED);
|
|
||||||
}
|
|
||||||
|
|
||||||
pRes->info.rows = count;
|
|
||||||
return (pRes->info.rows == 0)? NULL:pInfo->pRes;
|
|
||||||
|
|
||||||
#endif
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
|
|
||||||
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
|
||||||
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
|
|
||||||
assert(numOfGroup == 0 || numOfGroup == 1);
|
|
||||||
|
|
||||||
pInfo->curPos = 0;
|
|
||||||
|
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
|
||||||
pOperator->name = "SeqTableTagScan";
|
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
|
||||||
pOperator->blockingOptr = false;
|
|
||||||
pOperator->status = OP_NOT_OPENED;
|
|
||||||
pOperator->info = pInfo;
|
|
||||||
pOperator->getNextFn = doTagScan;
|
|
||||||
pOperator->pExpr = pExpr;
|
|
||||||
pOperator->numOfOutput = numOfOutput;
|
|
||||||
pOperator->closeFn = destroyTagScanOperatorInfo;
|
|
||||||
|
|
||||||
return pOperator;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) {
|
static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) {
|
||||||
int32_t j = 0;
|
int32_t j = 0;
|
||||||
|
|
|
@ -980,3 +980,167 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
#if 0
|
||||||
|
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
|
||||||
|
|
||||||
|
STagScanInfo *pInfo = pOperator->info;
|
||||||
|
SSDataBlock *pRes = pInfo->pRes;
|
||||||
|
*newgroup = false;
|
||||||
|
|
||||||
|
int32_t count = 0;
|
||||||
|
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
|
||||||
|
|
||||||
|
int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]);
|
||||||
|
if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
|
||||||
|
assert(pQueryAttr->numOfOutput == 1);
|
||||||
|
|
||||||
|
SExprInfo* pExprInfo = &pOperator->pExpr[0];
|
||||||
|
int32_t rsize = pExprInfo->base.resSchema.bytes;
|
||||||
|
|
||||||
|
count = 0;
|
||||||
|
|
||||||
|
int16_t bytes = pExprInfo->base.resSchema.bytes;
|
||||||
|
int16_t type = pExprInfo->base.resSchema.type;
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
|
||||||
|
if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
|
||||||
|
bytes = pQueryAttr->tagColList[i].bytes;
|
||||||
|
type = pQueryAttr->tagColList[i].type;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
|
||||||
|
|
||||||
|
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
||||||
|
int32_t i = pInfo->curPos++;
|
||||||
|
STableQueryInfo *item = taosArrayGetP(pa, i);
|
||||||
|
|
||||||
|
char *output = pColInfo->pData + count * rsize;
|
||||||
|
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
|
||||||
|
|
||||||
|
output = varDataVal(output);
|
||||||
|
STableId* id = TSDB_TABLEID(item->pTable);
|
||||||
|
|
||||||
|
*(int16_t *)output = 0;
|
||||||
|
output += sizeof(int16_t);
|
||||||
|
|
||||||
|
*(int64_t *)output = id->uid; // memory align problem, todo serialize
|
||||||
|
output += sizeof(id->uid);
|
||||||
|
|
||||||
|
*(int32_t *)output = id->tid;
|
||||||
|
output += sizeof(id->tid);
|
||||||
|
|
||||||
|
*(int32_t *)output = pQueryAttr->vgId;
|
||||||
|
output += sizeof(pQueryAttr->vgId);
|
||||||
|
|
||||||
|
char* data = NULL;
|
||||||
|
if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||||
|
data = tsdbGetTableName(item->pTable);
|
||||||
|
} else {
|
||||||
|
data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
doSetTagValueToResultBuf(output, data, type, bytes);
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
//qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
||||||
|
} else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
|
||||||
|
*(int64_t*)pColInfo->pData = pInfo->totalTables;
|
||||||
|
count = 1;
|
||||||
|
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
//qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
|
||||||
|
} else { // return only the tags|table name etc.
|
||||||
|
SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo
|
||||||
|
|
||||||
|
count = 0;
|
||||||
|
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
||||||
|
int32_t i = pInfo->curPos++;
|
||||||
|
|
||||||
|
STableQueryInfo* item = taosArrayGetP(pa, i);
|
||||||
|
|
||||||
|
char *data = NULL, *dst = NULL;
|
||||||
|
int16_t type = 0, bytes = 0;
|
||||||
|
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
|
||||||
|
// not assign value in case of user defined constant output column
|
||||||
|
if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j);
|
||||||
|
type = pExprInfo[j].base.resSchema.type;
|
||||||
|
bytes = pExprInfo[j].base.resSchema.bytes;
|
||||||
|
|
||||||
|
if (pExprInfo[j].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||||
|
data = tsdbGetTableName(item->pTable);
|
||||||
|
} else {
|
||||||
|
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
|
||||||
|
doSetTagValueToResultBuf(dst, data, type, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->curPos >= pInfo->totalTables) {
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
//qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
setTaskStatus(pOperator->pRuntimeEnv, TASK_COMPLETED);
|
||||||
|
}
|
||||||
|
|
||||||
|
pRes->info.rows = count;
|
||||||
|
return (pRes->info.rows == 0)? NULL:pInfo->pRes;
|
||||||
|
|
||||||
|
#endif
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
STagScanInfo* pInfo = (STagScanInfo*)param;
|
||||||
|
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
|
||||||
|
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
||||||
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pReader = pReaderHandle;
|
||||||
|
pInfo->curPos = 0;
|
||||||
|
pOperator->name = "TagScanOperator";
|
||||||
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
||||||
|
pOperator->blockingOptr = false;
|
||||||
|
pOperator->status = OP_NOT_OPENED;
|
||||||
|
pOperator->info = pInfo;
|
||||||
|
pOperator->getNextFn = doTagScan;
|
||||||
|
pOperator->pExpr = pExpr;
|
||||||
|
pOperator->numOfOutput = numOfOutput;
|
||||||
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
pOperator->closeFn = destroyTagScanOperatorInfo;
|
||||||
|
|
||||||
|
return pOperator;
|
||||||
|
_error:
|
||||||
|
taosMemoryFree(pInfo);
|
||||||
|
taosMemoryFree(pOperator);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ int32_t maxFunction(SqlFunctionCtx *pCtx);
|
||||||
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t avgFunction(SqlFunctionCtx* pCtx);
|
int32_t avgFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
|
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
|
|
@ -44,8 +44,6 @@ typedef struct STopBotResItem {
|
||||||
} STopBotResItem;
|
} STopBotResItem;
|
||||||
|
|
||||||
typedef struct STopBotRes {
|
typedef struct STopBotRes {
|
||||||
int32_t pageId;
|
|
||||||
// int32_t num;
|
|
||||||
STopBotResItem *pItems;
|
STopBotResItem *pItems;
|
||||||
} STopBotRes;
|
} STopBotRes;
|
||||||
|
|
||||||
|
@ -92,18 +90,6 @@ typedef struct SDiffInfo {
|
||||||
} \
|
} \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
|
||||||
do { \
|
|
||||||
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
|
|
||||||
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
|
|
||||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
|
||||||
__ctx->tag.i = (ts); \
|
|
||||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
|
||||||
} \
|
|
||||||
__ctx->fpSet.process(__ctx); \
|
|
||||||
} \
|
|
||||||
} while (0)
|
|
||||||
|
|
||||||
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
|
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
|
||||||
do { \
|
do { \
|
||||||
if (((left) < (right)) ^ (sign)) { \
|
if (((left) < (right)) ^ (sign)) { \
|
||||||
|
@ -407,7 +393,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
int32_t type = pInput->pData[0]->info.type;
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
@ -417,7 +403,7 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
||||||
pAvgRes->result = pAvgRes->sum.dsum / ((double) pAvgRes->count);
|
pAvgRes->result = pAvgRes->sum.dsum / ((double) pAvgRes->count);
|
||||||
}
|
}
|
||||||
|
|
||||||
return functionFinalize(pCtx, pBlock, slotId);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){
|
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){
|
||||||
|
@ -841,7 +827,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
|
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
int32_t type = pInput->pData[0]->info.type;
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
@ -1383,21 +1369,6 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct STopBotResItem {
|
|
||||||
SVariant v;
|
|
||||||
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
|
|
||||||
struct {
|
|
||||||
int32_t pageId;
|
|
||||||
int32_t offset;
|
|
||||||
} tuplePos; // tuple data of this chosen row
|
|
||||||
} STopBotResItem;
|
|
||||||
|
|
||||||
typedef struct STopBotRes {
|
|
||||||
// int32_t pageId;
|
|
||||||
// int32_t num;
|
|
||||||
STopBotResItem *pItems;
|
|
||||||
} STopBotRes;
|
|
||||||
|
|
||||||
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
|
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
|
||||||
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
|
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
|
||||||
|
|
Loading…
Reference in New Issue