refactor: do some internal refactor.
This commit is contained in:
parent
ebf8755d9d
commit
d8455841e0
|
@ -54,10 +54,6 @@ typedef struct SFuncExecFuncs {
|
||||||
FExecCombine combine;
|
FExecCombine combine;
|
||||||
} SFuncExecFuncs;
|
} SFuncExecFuncs;
|
||||||
|
|
||||||
typedef struct SFileBlockInfo {
|
|
||||||
int32_t numBlocksOfStep;
|
|
||||||
} SFileBlockInfo;
|
|
||||||
|
|
||||||
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
|
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
|
||||||
|
|
||||||
#define TOP_BOTTOM_QUERY_LIMIT 100
|
#define TOP_BOTTOM_QUERY_LIMIT 100
|
||||||
|
@ -171,8 +167,6 @@ typedef struct tExprNode {
|
||||||
};
|
};
|
||||||
} tExprNode;
|
} tExprNode;
|
||||||
|
|
||||||
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));
|
|
||||||
|
|
||||||
struct SScalarParam {
|
struct SScalarParam {
|
||||||
bool colAlloced;
|
bool colAlloced;
|
||||||
SColumnInfoData *columnData;
|
SColumnInfoData *columnData;
|
||||||
|
@ -182,10 +176,6 @@ struct SScalarParam {
|
||||||
int32_t numOfRows;
|
int32_t numOfRows;
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
|
|
||||||
bool isSuperTable);
|
|
||||||
|
|
||||||
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
|
|
||||||
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
|
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
|
||||||
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock);
|
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock);
|
||||||
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry);
|
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry);
|
||||||
|
|
|
@ -2458,81 +2458,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
|
|
||||||
|
|
||||||
STagScanInfo *pInfo = pOperator->info;
|
|
||||||
SSDataBlock *pRes = pInfo->pRes;
|
|
||||||
|
|
||||||
int32_t count = 0;
|
|
||||||
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
|
|
||||||
|
|
||||||
int32_t functionId = getExprFunctionId(&pOperator->exprSupp.pExprInfo[0]);
|
|
||||||
if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
|
|
||||||
assert(pQueryAttr->numOfOutput == 1);
|
|
||||||
|
|
||||||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
|
||||||
int32_t rsize = pExprInfo->base.resSchema.bytes;
|
|
||||||
|
|
||||||
count = 0;
|
|
||||||
|
|
||||||
int16_t bytes = pExprInfo->base.resSchema.bytes;
|
|
||||||
int16_t type = pExprInfo->base.resSchema.type;
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < pQueryAttr->numOfTags; ++i) {
|
|
||||||
if (pQueryAttr->tagColList[i].colId == pExprInfo->base.pColumns->info.colId) {
|
|
||||||
bytes = pQueryAttr->tagColList[i].bytes;
|
|
||||||
type = pQueryAttr->tagColList[i].type;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
|
|
||||||
|
|
||||||
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
|
||||||
int32_t i = pInfo->curPos++;
|
|
||||||
STableQueryInfo *item = taosArrayGetP(pa, i);
|
|
||||||
|
|
||||||
char *output = pColInfo->pData + count * rsize;
|
|
||||||
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
|
|
||||||
|
|
||||||
output = varDataVal(output);
|
|
||||||
STableId* id = TSDB_TABLEID(item->pTable);
|
|
||||||
|
|
||||||
*(int16_t *)output = 0;
|
|
||||||
output += sizeof(int16_t);
|
|
||||||
|
|
||||||
*(int64_t *)output = id->uid; // memory align problem, todo serialize
|
|
||||||
output += sizeof(id->uid);
|
|
||||||
|
|
||||||
*(int32_t *)output = id->tid;
|
|
||||||
output += sizeof(id->tid);
|
|
||||||
|
|
||||||
*(int32_t *)output = pQueryAttr->vgId;
|
|
||||||
output += sizeof(pQueryAttr->vgId);
|
|
||||||
|
|
||||||
char* data = NULL;
|
|
||||||
if (pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
|
||||||
data = tsdbGetTableName(item->pTable);
|
|
||||||
} else {
|
|
||||||
data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.pColumns->info.colId, type, bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
doSetTagValueToResultBuf(output, data, type, bytes);
|
|
||||||
count += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
//qDebug("QInfo:0x%"PRIx64" create (tableId, tag) info completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
|
||||||
} else if (functionId == FUNCTION_COUNT) {// handle the "count(tbname)" query
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
|
|
||||||
*(int64_t*)pColInfo->pData = pInfo->totalTables;
|
|
||||||
count = 1;
|
|
||||||
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
|
||||||
//qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
|
|
||||||
} else { // return only the tags|table name etc.
|
|
||||||
#endif
|
|
||||||
|
|
||||||
STagScanInfo* pInfo = pOperator->info;
|
STagScanInfo* pInfo = pOperator->info;
|
||||||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
||||||
SSDataBlock* pRes = pInfo->pRes;
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
|
|
|
@ -13,8 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef TDENGINE_TAGGFUNCTION_H
|
#ifndef TDENGINE_TFUNCTIONINT_H
|
||||||
#define TDENGINE_TAGGFUNCTION_H
|
#define TDENGINE_TFUNCTIONINT_H
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -28,17 +28,6 @@ extern "C" {
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "tudf.h"
|
#include "tudf.h"
|
||||||
|
|
||||||
#define AVG_FUNCTION_INTER_BUFFER_SIZE 50
|
|
||||||
|
|
||||||
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
|
|
||||||
#define DATA_SET_FLAG_SIZE sizeof(DATA_SET_FLAG)
|
|
||||||
|
|
||||||
typedef struct SInterpInfoDetail {
|
|
||||||
TSKEY ts; // interp specified timestamp
|
|
||||||
int8_t type;
|
|
||||||
int8_t primaryCol;
|
|
||||||
} SInterpInfoDetail;
|
|
||||||
|
|
||||||
bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const char *maxval);
|
bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const char *maxval);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -57,4 +46,4 @@ static FORCE_INLINE void initResultRowEntry(SResultRowEntryInfo *pResInfo, int32
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif // TDENGINE_TAGGFUNCTION_H
|
#endif // TDENGINE_TFUNCTIONINT_H
|
|
@ -18,10 +18,10 @@
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "taggfunction.h"
|
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tdigest.h"
|
#include "tdigest.h"
|
||||||
|
#include "tfunctionInt.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "thistogram.h"
|
#include "thistogram.h"
|
||||||
#include "tpercentile.h"
|
#include "tpercentile.h"
|
||||||
|
@ -312,14 +312,6 @@ typedef struct SGroupKeyInfo {
|
||||||
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
|
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
|
||||||
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
|
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
|
||||||
|
|
||||||
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
|
|
||||||
do { \
|
|
||||||
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
|
|
||||||
SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
|
|
||||||
__ctx->fpSet.process(__ctx); \
|
|
||||||
} \
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
||||||
do { \
|
do { \
|
||||||
for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
|
for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,67 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "function.h"
|
|
||||||
#include "os.h"
|
|
||||||
|
|
||||||
#include "texception.h"
|
|
||||||
#include "tmsg.h"
|
|
||||||
|
|
||||||
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *));
|
|
||||||
|
|
||||||
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) {
|
|
||||||
if (pNode == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pNode->nodeType == TEXPR_BINARYEXPR_NODE || pNode->nodeType == TEXPR_UNARYEXPR_NODE) {
|
|
||||||
doExprTreeDestroy(&pNode, fp);
|
|
||||||
}
|
|
||||||
taosMemoryFree(pNode);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
|
|
||||||
if (*pExpr == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
taosMemoryFree(*pExpr);
|
|
||||||
*pExpr = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: these three functions should be made global
|
|
||||||
static void* exception_calloc(size_t nmemb, size_t size) {
|
|
||||||
void* p = taosMemoryCalloc(nmemb, size);
|
|
||||||
if (p == NULL) {
|
|
||||||
THROW(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
||||||
}
|
|
||||||
return p;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void* exception_malloc(size_t size) {
|
|
||||||
void* p = taosMemoryMalloc(size);
|
|
||||||
if (p == NULL) {
|
|
||||||
THROW(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
||||||
}
|
|
||||||
return p;
|
|
||||||
}
|
|
||||||
|
|
||||||
static UNUSED_FUNC char* exception_strdup(const char* str) {
|
|
||||||
char* p = strdup(str);
|
|
||||||
if (p == NULL) {
|
|
||||||
THROW(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
||||||
}
|
|
||||||
return p;
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "taosdef.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
#include "thash.h"
|
||||||
|
#include "ttypes.h"
|
||||||
|
|
||||||
|
#include "function.h"
|
||||||
|
#include "tbuffer.h"
|
||||||
|
#include "tcompression.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
|
#include "tfunctionInt.h"
|
||||||
|
#include "thistogram.h"
|
||||||
|
#include "tpercentile.h"
|
||||||
|
#include "ttszip.h"
|
||||||
|
#include "tudf.h"
|
||||||
|
|
||||||
|
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell) {
|
||||||
|
pCell->initialized = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock) {
|
||||||
|
int32_t maxRows = 0;
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < num; ++j) {
|
||||||
|
#if 0
|
||||||
|
int32_t id = pCtx[j].functionId;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ts, tag, tagprj function can not decide the output number of current query
|
||||||
|
* the number of output result is decided by main output
|
||||||
|
*/
|
||||||
|
if (id == FUNCTION_TS || id == FUNCTION_TAG || id == FUNCTION_TAGPRJ) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[j]);
|
||||||
|
if (pResInfo != NULL && maxRows < pResInfo->numOfRes) {
|
||||||
|
maxRows = pResInfo->numOfRes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(maxRows >= 0);
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(pResBlock, maxRows);
|
||||||
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
|
||||||
|
|
||||||
|
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[i]);
|
||||||
|
if (pResInfo->numOfRes == 0) {
|
||||||
|
for(int32_t j = 0; j < pResInfo->numOfRes; ++j) {
|
||||||
|
colDataAppend(pCol, j, NULL, true); // TODO add set null data api
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (int32_t j = 0; j < pResInfo->numOfRes; ++j) {
|
||||||
|
colDataAppend(pCol, j, GET_ROWCELL_INTERBUF(pResInfo), false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pResBlock->info.rows = maxRows;
|
||||||
|
return maxRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry) {
|
||||||
|
assert(pEntry != NULL);
|
||||||
|
return pEntry->complete;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry) {
|
||||||
|
return pEntry->initialized;
|
||||||
|
}
|
Loading…
Reference in New Issue