[td-225]fix bug found in test script
This commit is contained in:
parent
988400dc43
commit
74bb1565fa
|
@ -175,7 +175,7 @@ SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIn
|
|||
|
||||
SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
|
||||
int16_t size);
|
||||
int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
|
||||
size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
|
||||
|
||||
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
|
||||
void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
|
||||
|
|
|
@ -1848,13 +1848,14 @@ static void last_row_function(SQLFunctionCtx *pCtx) {
|
|||
pResInfo->hasResult = DATA_SET_FLAG;
|
||||
|
||||
SLastrowInfo *pInfo = (SLastrowInfo *)pResInfo->interResultBuf;
|
||||
pInfo->ts = pCtx->param[0].i64Key;
|
||||
pInfo->ts = pCtx->ptsList[0];
|
||||
|
||||
pInfo->hasResult = DATA_SET_FLAG;
|
||||
|
||||
// set the result to final result buffer
|
||||
if (pResInfo->superTableQ) {
|
||||
SLastrowInfo *pInfo1 = (SLastrowInfo *)(pCtx->aOutputBuf + pCtx->inputBytes);
|
||||
pInfo1->ts = pCtx->param[0].i64Key;
|
||||
pInfo1->ts = pCtx->ptsList[0];
|
||||
pInfo1->hasResult = DATA_SET_FLAG;
|
||||
|
||||
DO_UPDATE_TAG_COLUMNS(pCtx, pInfo1->ts);
|
||||
|
@ -1904,13 +1905,12 @@ static void valuePairAssign(tValuePair *dst, int16_t type, const char *val, int6
|
|||
memcpy(dst->pTags, pTags, (size_t)pTagInfo->tagsLen);
|
||||
} else { // the tags are dumped from the ctx tag fields
|
||||
for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) {
|
||||
SQLFunctionCtx* __ctx = pTagInfo->pTagCtxList[i];
|
||||
if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
__ctx->tag = (tVariant) {.nType = TSDB_DATA_TYPE_BIGINT, .i64Key = tsKey};
|
||||
SQLFunctionCtx* ctx = pTagInfo->pTagCtxList[i];
|
||||
if (ctx->functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
ctx->tag = (tVariant) {.nType = TSDB_DATA_TYPE_BIGINT, .i64Key = tsKey};
|
||||
}
|
||||
|
||||
//todo? error ??
|
||||
tVariantDump(&pTagInfo->pTagCtxList[i]->tag, dst->pTags + size, pTagInfo->pTagCtxList[i]->tag.nType, false);
|
||||
tVariantDump(&ctx->tag, dst->pTags + size, ctx->tag.nType, true);
|
||||
size += pTagInfo->pTagCtxList[i]->outputBytes;
|
||||
}
|
||||
}
|
||||
|
@ -2227,7 +2227,6 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
|
|||
static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) {
|
||||
char *tmp = (char *)pTopBotInfo + sizeof(STopBotInfo);
|
||||
pTopBotInfo->res = (tValuePair**) tmp;
|
||||
|
||||
tmp += POINTER_BYTES * pCtx->param[0].i64Key;
|
||||
|
||||
size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen;
|
||||
|
@ -3823,115 +3822,11 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
/**
|
||||
* param[1]: default value/previous value of specified timestamp
|
||||
* param[2]: next value of specified timestamp
|
||||
* param[3]: denotes if the result is a precious result or interpolation results
|
||||
*
|
||||
* param[1]: denote the specified timestamp to generated the interp result
|
||||
* param[2]: fill policy
|
||||
*
|
||||
* @param pCtx
|
||||
*/
|
||||
static void interp_function(SQLFunctionCtx *pCtx) {
|
||||
// at this point, the value is existed, return directly
|
||||
#if 0
|
||||
if (pCtx->param[3].i64Key == 1) {
|
||||
char *pData = GET_INPUT_CHAR(pCtx);
|
||||
assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType);
|
||||
} else {
|
||||
/*
|
||||
* use interpolation to generate the result.
|
||||
* Note: the result of primary timestamp column uses the timestamp specified by user in the query sql
|
||||
*/
|
||||
assert(pCtx->param[3].i64Key == 2);
|
||||
|
||||
SInterpInfo interpInfo = *(SInterpInfo *)pCtx->aOutputBuf;
|
||||
SInterpInfoDetail *pInfoDetail = interpInfo.pInterpDetail;
|
||||
|
||||
/* set no output result */
|
||||
if (pInfoDetail->type == TSDB_FILL_NONE) {
|
||||
pCtx->param[3].i64Key = 0;
|
||||
} else if (pInfoDetail->primaryCol == 1) {
|
||||
*(TSKEY *)pCtx->aOutputBuf = pInfoDetail->ts;
|
||||
} else {
|
||||
if (pInfoDetail->type == TSDB_FILL_NULL) {
|
||||
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
|
||||
} else {
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
}
|
||||
} else if (pInfoDetail->type == TSDB_FILL_SET_VALUE) {
|
||||
tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType);
|
||||
} else if (pInfoDetail->type == TSDB_FILL_PREV) {
|
||||
char *data = pCtx->param[1].pz;
|
||||
char *pVal = data + TSDB_KEYSIZE;
|
||||
|
||||
if (pCtx->outputType == TSDB_DATA_TYPE_FLOAT) {
|
||||
float v = GET_DOUBLE_VAL(pVal);
|
||||
assignVal(pCtx->aOutputBuf, (const char*) &v, pCtx->outputBytes, pCtx->outputType);
|
||||
} else {
|
||||
assignVal(pCtx->aOutputBuf, pVal, pCtx->outputBytes, pCtx->outputType);
|
||||
}
|
||||
|
||||
} else if (pInfoDetail->type == TSDB_FILL_LINEAR) {
|
||||
char *data1 = pCtx->param[1].pz;
|
||||
char *data2 = pCtx->param[2].pz;
|
||||
|
||||
char *pVal1 = data1 + TSDB_KEYSIZE;
|
||||
char *pVal2 = data2 + TSDB_KEYSIZE;
|
||||
|
||||
SPoint point1 = {.key = *(TSKEY *)data1, .val = &pCtx->param[1].i64Key};
|
||||
SPoint point2 = {.key = *(TSKEY *)data2, .val = &pCtx->param[2].i64Key};
|
||||
|
||||
SPoint point = {.key = pInfoDetail->ts, .val = pCtx->aOutputBuf};
|
||||
|
||||
int32_t srcType = pCtx->inputType;
|
||||
if ((srcType >= TSDB_DATA_TYPE_TINYINT && srcType <= TSDB_DATA_TYPE_BIGINT) ||
|
||||
srcType == TSDB_DATA_TYPE_TIMESTAMP || srcType == TSDB_DATA_TYPE_DOUBLE) {
|
||||
point1.val = pVal1;
|
||||
|
||||
point2.val = pVal2;
|
||||
|
||||
if (isNull(pVal1, srcType) || isNull(pVal2, srcType)) {
|
||||
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
|
||||
} else {
|
||||
taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point);
|
||||
}
|
||||
} else if (srcType == TSDB_DATA_TYPE_FLOAT) {
|
||||
float v1 = GET_DOUBLE_VAL(pVal1);
|
||||
float v2 = GET_DOUBLE_VAL(pVal2);
|
||||
|
||||
point1.val = &v1;
|
||||
point2.val = &v2;
|
||||
|
||||
if (isNull(pVal1, srcType) || isNull(pVal2, srcType)) {
|
||||
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
|
||||
} else {
|
||||
taosDoLinearInterpolation(pCtx->outputType, &point1, &point2, &point);
|
||||
}
|
||||
|
||||
} else {
|
||||
if (srcType == TSDB_DATA_TYPE_BINARY || srcType == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(pCtx->aOutputBuf, pCtx->inputBytes);
|
||||
} else {
|
||||
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
free(interpInfo.pInterpDetail);
|
||||
}
|
||||
|
||||
pCtx->size = pCtx->param[3].i64Key;
|
||||
|
||||
tVariantDestroy(&pCtx->param[1]);
|
||||
tVariantDestroy(&pCtx->param[2]);
|
||||
|
||||
// data in the check operation are all null, not output
|
||||
SET_VAL(pCtx, pCtx->size, 1);
|
||||
#endif
|
||||
|
||||
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SInterpInfoDetail* pInfo = pResInfo->interResultBuf;
|
||||
|
||||
|
|
|
@ -1403,7 +1403,6 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI
|
|||
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE};
|
||||
strcpy(colSchema.name, TSQL_TBNAME_L);
|
||||
|
||||
pQueryInfo->type = TSDB_QUERY_TYPE_STABLE_QUERY;
|
||||
tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, true);
|
||||
} else {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
|
@ -4166,6 +4165,10 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema
|
|||
if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) {
|
||||
int32_t relTagIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
|
||||
|
||||
// it is a tag column
|
||||
if (pQueryInfo->groupbyExpr.columnInfo == NULL) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
|
||||
}
|
||||
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0);
|
||||
if (relTagIndex == pColIndex->colIndex) {
|
||||
orderByTags = true;
|
||||
|
@ -4678,7 +4681,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
|
|||
|
||||
const char* msg0 = "soffset/offset can not be less than 0";
|
||||
const char* msg1 = "slimit/soffset only available for STable query";
|
||||
const char* msg2 = "function not supported on table";
|
||||
const char* msg2 = "functions mixed up in table query";
|
||||
const char* msg3 = "slimit/soffset can not apply to projection query";
|
||||
|
||||
// handle the limit offset value, validate the limit
|
||||
|
@ -4761,14 +4764,22 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
|
|||
}
|
||||
|
||||
size_t size = taosArrayGetSize(pQueryInfo->exprList);
|
||||
|
||||
|
||||
bool hasTags = false;
|
||||
bool hasOtherFunc = false;
|
||||
// filter the query functions operating on "tbname" column that are not supported by normal columns.
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||
if (pExpr->colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
|
||||
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
|
||||
hasTags = true;
|
||||
} else {
|
||||
hasOtherFunc = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (hasTags && hasOtherFunc) {
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -5831,7 +5842,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
|
|||
pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000;
|
||||
}
|
||||
} else { // set the time rang
|
||||
pQueryInfo->window.skey = 0;
|
||||
pQueryInfo->window.skey = TSKEY_INITIAL_VAL;
|
||||
pQueryInfo->window.ekey = INT64_MAX;
|
||||
}
|
||||
|
||||
|
|
|
@ -1049,7 +1049,14 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer,
|
|||
int32_t functionId = pExpr->functionId;
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
tVariantDestroy(&pCtx->tag);
|
||||
tVariantCreateFromBinary(&pCtx->tag, pCtx->aInputElemBuf, pCtx->inputBytes, pCtx->inputType);
|
||||
char* input = pCtx->aInputElemBuf;
|
||||
|
||||
if (pCtx->inputType == TSDB_DATA_TYPE_BINARY || pCtx->inputType == TSDB_DATA_TYPE_NCHAR) {
|
||||
assert(varDataLen(input) <= pCtx->inputBytes);
|
||||
tVariantCreateFromBinary(&pCtx->tag, varDataVal(input), varDataLen(input), pCtx->inputType);
|
||||
} else {
|
||||
tVariantCreateFromBinary(&pCtx->tag, input, pCtx->inputBytes, pCtx->inputType);
|
||||
}
|
||||
}
|
||||
|
||||
pCtx->currentStage = SECONDARY_STAGE_MERGE;
|
||||
|
@ -1309,7 +1316,7 @@ static bool isAllSourcesCompleted(SLocalReducer *pLocalReducer) {
|
|||
return (pLocalReducer->numOfBuffer == pLocalReducer->numOfCompleted);
|
||||
}
|
||||
|
||||
static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
|
||||
static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
||||
|
@ -1347,8 +1354,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
|
|||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
||||
SLocalReducer * pLocalReducer = pRes->pLocalReducer;
|
||||
SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
|
||||
SLocalReducer *pLocalReducer = pRes->pLocalReducer;
|
||||
SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
|
||||
|
||||
bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow;
|
||||
|
||||
|
@ -1445,7 +1452,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (doInterpolationForCurrentGroup(pSql)) {
|
||||
if (doBuildFilledResultForGroup(pSql)) {
|
||||
pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result.
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1464,8 +1471,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
|
|||
#ifdef _DEBUG_VIEW
|
||||
printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index);
|
||||
#endif
|
||||
assert((pTree->pNode[0].index < pLocalReducer->numOfBuffer) && (pTree->pNode[0].index >= 0) &&
|
||||
tmpBuffer->num == 0);
|
||||
assert((pTree->pNode[0].index < pLocalReducer->numOfBuffer) && (pTree->pNode[0].index >= 0) && tmpBuffer->num == 0);
|
||||
|
||||
// chosen from loser tree
|
||||
SLocalDataSource *pOneDataSrc = pLocalReducer->pLocalDataSrc[pTree->pNode[0].index];
|
||||
|
|
|
@ -1032,7 +1032,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
|
|||
return pExpr;
|
||||
}
|
||||
|
||||
int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) {
|
||||
size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) {
|
||||
return taosArrayGetSize(pQueryInfo->exprList);
|
||||
}
|
||||
|
||||
|
@ -1351,7 +1351,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (colId == -1 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
if (colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2122,7 +2122,7 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column
|
|||
int32_t type = pInfo->pSqlExpr->resType;
|
||||
int32_t bytes = pInfo->pSqlExpr->resBytes;
|
||||
|
||||
char* pData = ((char*) pRes->data) + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
|
||||
char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
|
||||
|
||||
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
|
||||
int32_t realLen = varDataLen(pData);
|
||||
|
@ -2135,7 +2135,7 @@ void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t column
|
|||
}
|
||||
|
||||
if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
|
||||
*(char*) (pData + realLen + VARSTR_HEADER_SIZE) = 0;
|
||||
*(pData + realLen + VARSTR_HEADER_SIZE) = 0;
|
||||
}
|
||||
|
||||
pRes->length[columnIndex] = realLen;
|
||||
|
|
|
@ -620,13 +620,6 @@ typedef struct {
|
|||
SCMVgroupInfo vgroups[];
|
||||
} SVgroupsInfo;
|
||||
|
||||
//typedef struct {
|
||||
// int32_t numOfTables;
|
||||
// int32_t join;
|
||||
// int32_t joinCondLen; // for join condition
|
||||
// int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM];
|
||||
//} SSuperTableMetaMsg;
|
||||
|
||||
typedef struct STableMetaMsg {
|
||||
int32_t contLen;
|
||||
char tableId[TSDB_TABLE_ID_LEN + 1]; // table id
|
||||
|
|
|
@ -198,6 +198,8 @@ TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable
|
|||
*/
|
||||
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo);
|
||||
|
||||
SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle);
|
||||
|
||||
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList);
|
||||
|
||||
/**
|
||||
|
|
|
@ -28,8 +28,7 @@ extern "C" {
|
|||
#include "tdataformat.h"
|
||||
#include "talgo.h"
|
||||
|
||||
#define DEFAULT_PAGE_SIZE 16384 // 16k larger than the SHistoInfo
|
||||
#define MIN_BUFFER_SIZE (1 << 19)
|
||||
#define DEFAULT_PAGE_SIZE (1024L*56) // 16k larger than the SHistoInfo
|
||||
#define MAX_TMPFILE_PATH_LENGTH PATH_MAX
|
||||
#define INITIAL_ALLOCATION_BUFFER_SIZE 64
|
||||
|
||||
|
|
|
@ -44,6 +44,8 @@ typedef struct SDiskbasedResultBuf {
|
|||
SIDList* list; // for each id, there is a page id list
|
||||
} SDiskbasedResultBuf;
|
||||
|
||||
#define DEFAULT_INTERN_BUF_PAGE_SIZE (8192L*5)
|
||||
|
||||
/**
|
||||
* create disk-based result buffer
|
||||
* @param pResultBuf
|
||||
|
|
|
@ -161,26 +161,24 @@ typedef struct SExtTagsInfo {
|
|||
|
||||
// sql function runtime context
|
||||
typedef struct SQLFunctionCtx {
|
||||
int32_t startOffset;
|
||||
int32_t size; // number of rows
|
||||
uint32_t order; // asc|desc
|
||||
uint32_t scanFlag; // TODO merge with currentStage
|
||||
|
||||
int16_t inputType;
|
||||
int16_t inputBytes;
|
||||
|
||||
int16_t outputType;
|
||||
int16_t outputBytes; // size of results, determined by function and input column data type
|
||||
bool hasNull; // null value exist in current block
|
||||
int16_t functionId; // function id
|
||||
void * aInputElemBuf;
|
||||
char * aOutputBuf; // final result output buffer, point to sdata->data
|
||||
uint8_t currentStage; // record current running step, default: 0
|
||||
int64_t nStartQueryTimestamp; // timestamp range of current query when function is executed on a specific data block
|
||||
int32_t numOfParams;
|
||||
tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param */
|
||||
int64_t *ptsList; // corresponding timestamp array list
|
||||
void * ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
|
||||
int32_t startOffset;
|
||||
int32_t size; // number of rows
|
||||
uint32_t order; // asc|desc
|
||||
int16_t inputType;
|
||||
int16_t inputBytes;
|
||||
|
||||
int16_t outputType;
|
||||
int16_t outputBytes; // size of results, determined by function and input column data type
|
||||
bool hasNull; // null value exist in current block
|
||||
int16_t functionId; // function id
|
||||
void * aInputElemBuf;
|
||||
char * aOutputBuf; // final result output buffer, point to sdata->data
|
||||
uint8_t currentStage; // record current running step, default: 0
|
||||
int64_t nStartQueryTimestamp; // timestamp range of current query when function is executed on a specific data block
|
||||
int32_t numOfParams;
|
||||
tVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param */
|
||||
int64_t * ptsList; // corresponding timestamp array list
|
||||
void * ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
|
||||
SQLPreAggVal preAggVals;
|
||||
tVariant tag;
|
||||
SResultInfo *resultInfo;
|
||||
|
|
|
@ -48,7 +48,7 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc);
|
|||
|
||||
int32_t tVariantToString(tVariant *pVar, char *dst);
|
||||
|
||||
int32_t tVariantDump(tVariant *pVariant, char *payload, char type, bool includeLengthPrefix);
|
||||
int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool includeLengthPrefix);
|
||||
|
||||
int32_t tVariantTypeSetType(tVariant *pVariant, char type);
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include <qfill.h>
|
||||
#include "qfill.h"
|
||||
#include "os.h"
|
||||
|
||||
#include "hash.h"
|
||||
|
@ -30,8 +30,6 @@
|
|||
#include "tscompression.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#define DEFAULT_INTERN_BUF_SIZE 16384L
|
||||
|
||||
/**
|
||||
* check if the primary column is load by default, otherwise, the program will
|
||||
* forced to load primary column explicitly.
|
||||
|
@ -821,7 +819,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
|
|||
*
|
||||
* @param pRuntimeEnv
|
||||
* @param forwardStep
|
||||
* @param primaryKeyCol
|
||||
* @param tsCols
|
||||
* @param pFields
|
||||
* @param isDiskFileBlock
|
||||
* @return the incremental number of output value, so it maybe 0 for fixed number of query,
|
||||
|
@ -831,25 +829,25 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|||
SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo,
|
||||
__block_search_fn_t searchFn, SArray *pDataBlock) {
|
||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
TSKEY *primaryKeyCol = NULL;
|
||||
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
TSKEY *tsCols = NULL;
|
||||
if (pDataBlock != NULL) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, 0);
|
||||
primaryKeyCol = (TSKEY *)(pColInfo->pData);
|
||||
tsCols = (TSKEY *)(pColInfo->pData);
|
||||
}
|
||||
|
||||
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
|
||||
|
||||
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
|
||||
setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo, pStatis, &sasArray[k], k);
|
||||
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k);
|
||||
}
|
||||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
if (isIntervalQuery(pQuery)) {
|
||||
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
|
||||
TSKEY ts = primaryKeyCol[offset];
|
||||
TSKEY ts = tsCols[offset];
|
||||
|
||||
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -858,16 +856,16 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|||
|
||||
TSKEY ekey = reviseWindowEkey(pQuery, &win);
|
||||
int32_t forwardStep =
|
||||
getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, true);
|
||||
getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
|
||||
|
||||
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
||||
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep, primaryKeyCol, pDataBlockInfo->rows);
|
||||
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, pQuery->pos, forwardStep, tsCols, pDataBlockInfo->rows);
|
||||
|
||||
int32_t index = pWindowResInfo->curIndex;
|
||||
STimeWindow nextWin = win;
|
||||
|
||||
while (1) {
|
||||
int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, primaryKeyCol, searchFn);
|
||||
int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, tsCols, searchFn);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
|
@ -878,10 +876,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|||
}
|
||||
|
||||
ekey = reviseWindowEkey(pQuery, &nextWin);
|
||||
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, primaryKeyCol, startPos, ekey, searchFn, true);
|
||||
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
|
||||
|
||||
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
||||
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep, primaryKeyCol, pDataBlockInfo->rows);
|
||||
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
|
||||
}
|
||||
|
||||
pWindowResInfo->curIndex = index;
|
||||
|
@ -1043,7 +1041,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
STableQueryInfo* item = pQuery->current;
|
||||
|
||||
TSKEY *primaryKeyCol = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData;
|
||||
TSKEY *tsCols = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData;
|
||||
bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr);
|
||||
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
|
||||
|
||||
|
@ -1057,7 +1055,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
|
||||
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
|
||||
setExecParams(pQuery, &pCtx[k], dataBlock, primaryKeyCol, pDataBlockInfo, pStatis, &sasArray[k], k);
|
||||
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k);
|
||||
}
|
||||
|
||||
// set the input column data
|
||||
|
@ -1101,7 +1099,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
// interval window query
|
||||
if (isIntervalQuery(pQuery)) {
|
||||
// decide the time window according to the primary timestamp
|
||||
int64_t ts = primaryKeyCol[offset];
|
||||
int64_t ts = tsCols[offset];
|
||||
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
||||
|
||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win);
|
||||
|
@ -1165,7 +1163,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
}
|
||||
}
|
||||
|
||||
item->lastKey = primaryKeyCol[offset] + step;
|
||||
item->lastKey = tsCols[offset] + step;
|
||||
|
||||
// todo refactor: extract method
|
||||
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
|
@ -1310,14 +1308,15 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
|
|||
// set the output buffer for the selectivity + tag query
|
||||
static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
|
||||
if (isSelectivityWithTagsQuery(pQuery)) {
|
||||
int32_t num = 0;
|
||||
SQLFunctionCtx *p = NULL;
|
||||
|
||||
int32_t num = 0;
|
||||
int16_t tagLen = 0;
|
||||
|
||||
|
||||
SQLFunctionCtx *p = NULL;
|
||||
SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES);
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base;
|
||||
|
||||
if (pSqlFuncMsg->functionId == TSDB_FUNC_TAG_DUMMY || pSqlFuncMsg->functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
tagLen += pCtx[i].outputBytes;
|
||||
pTagCtx[num++] = &pCtx[i];
|
||||
|
@ -1340,6 +1339,8 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
|
|||
|
||||
static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) {
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE);
|
||||
|
||||
setResultInfoBuf(&pResultInfo[i], pQuery->pSelectExpr[i].interBytes, isStableQuery);
|
||||
}
|
||||
}
|
||||
|
@ -1371,11 +1372,13 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
|
|||
pCtx->inputBytes = pQuery->tagColList[index].bytes;
|
||||
pCtx->inputType = pQuery->tagColList[index].type;
|
||||
}
|
||||
|
||||
} else {
|
||||
pCtx->inputBytes = pQuery->colList[index].bytes;
|
||||
pCtx->inputType = pQuery->colList[index].type;
|
||||
}
|
||||
|
||||
|
||||
assert(isValidDataType(pCtx->inputType, pCtx->inputBytes));
|
||||
pCtx->ptsOutputBuf = NULL;
|
||||
|
||||
pCtx->outputBytes = pQuery->pSelectExpr[i].bytes;
|
||||
|
@ -1524,7 +1527,7 @@ static bool isFixedOutputQuery(SQuery *pQuery) {
|
|||
static bool isPointInterpoQuery(SQuery *pQuery) {
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
int32_t functionID = pQuery->pSelectExpr[i].base.functionId;
|
||||
if (functionID == TSDB_FUNC_INTERP/* || functionID == TSDB_FUNC_LAST_ROW*/) {
|
||||
if (functionID == TSDB_FUNC_INTERP) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -1855,7 +1858,7 @@ static int32_t getRowParamForMultiRowsOutput(SQuery *pQuery, bool isSTableQuery)
|
|||
|
||||
static int32_t getNumOfRowsInResultPage(SQuery *pQuery, bool isSTableQuery) {
|
||||
int32_t rowSize = pQuery->rowSize * getRowParamForMultiRowsOutput(pQuery, isSTableQuery);
|
||||
return (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / rowSize;
|
||||
return (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / rowSize;
|
||||
}
|
||||
|
||||
char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWindowResult *pResult) {
|
||||
|
@ -2276,14 +2279,18 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
|
|||
pCtx[i].hasNull = true;
|
||||
pCtx[i].nStartQueryTimestamp = timestamp;
|
||||
pCtx[i].aInputElemBuf = getPosInResultPage(pRuntimeEnv, i, pWindowRes);
|
||||
// pCtx[i].aInputElemBuf = ((char *)inputSrc->data) +
|
||||
// ((int32_t)pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) +
|
||||
// pCtx[i].outputBytes * inputIdx;
|
||||
|
||||
// in case of tag column, the tag information should be extracted from input buffer
|
||||
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG) {
|
||||
tVariantDestroy(&pCtx[i].tag);
|
||||
tVariantCreateFromBinary(&pCtx[i].tag, pCtx[i].aInputElemBuf, pCtx[i].inputBytes, pCtx[i].inputType);
|
||||
|
||||
int32_t type = pCtx[i].outputType;
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
tVariantCreateFromBinary(&pCtx[i].tag, varDataVal(pCtx[i].aInputElemBuf), varDataLen(pCtx[i].aInputElemBuf), type);
|
||||
} else {
|
||||
tVariantCreateFromBinary(&pCtx[i].tag, pCtx[i].aInputElemBuf, pCtx[i].inputBytes, pCtx[i].inputType);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2561,7 +2568,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
|||
|
||||
size_t size = taosArrayGetSize(pGroup);
|
||||
|
||||
tFilePage **buffer = (tFilePage **)pQuery->sdata;
|
||||
tFilePage **buffer = pQuery->sdata;
|
||||
int32_t * posList = calloc(size, sizeof(int32_t));
|
||||
|
||||
STableQueryInfo **pTableList = malloc(POINTER_BYTES * size);
|
||||
|
@ -2691,7 +2698,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) {
|
|||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
||||
int32_t capacity = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize;
|
||||
int32_t capacity = (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / pQuery->rowSize;
|
||||
|
||||
// the base value for group result, since the maximum number of table for each vnode will not exceed 100,000.
|
||||
int32_t pageId = -1;
|
||||
|
@ -3275,7 +3282,6 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
|
|||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||
pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult);
|
||||
|
||||
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
|
||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
|
||||
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
|
||||
|
@ -4233,7 +4239,15 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|||
}
|
||||
|
||||
initCtxOutputBuf(pRuntimeEnv);
|
||||
setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(tx, 0), pQInfo->tsdb);
|
||||
|
||||
SArray* s = tsdbGetQueriedTableIdList(pRuntimeEnv->pQueryHandle);
|
||||
assert(taosArrayGetSize(s) >= 1);
|
||||
|
||||
setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(s, 0), pQInfo->tsdb);
|
||||
|
||||
if (isFirstLastRowQuery(pQuery)) {
|
||||
assert(taosArrayGetSize(s) == 1);
|
||||
}
|
||||
|
||||
// here we simply set the first table as current table
|
||||
pQuery->current = ((SGroupItem*) taosArrayGet(group, 0))->info;
|
||||
|
@ -4802,11 +4816,12 @@ static void stableQueryImpl(SQInfo *pQInfo) {
|
|||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
if (isIntervalQuery(pQuery) ||
|
||||
(isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr))) {
|
||||
(isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) &&
|
||||
!isFirstLastRowQuery(pQuery))) {
|
||||
multiTableQueryProcess(pQInfo);
|
||||
} else {
|
||||
assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) ||
|
||||
isGroupbyNormalCol(pQuery->pGroupbyExpr));
|
||||
isFirstLastRowQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr));
|
||||
|
||||
sequentialTableProcess(pQInfo);
|
||||
}
|
||||
|
@ -5130,9 +5145,9 @@ static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo **pSqlFuncExpr,
|
||||
SSqlFuncMsg **pExprMsg, SColumnInfo* pTagCols) {
|
||||
*pSqlFuncExpr = NULL;
|
||||
static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
|
||||
SColumnInfo* pTagCols) {
|
||||
*pExprInfo = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SExprInfo *pExprs = (SExprInfo *)calloc(1, sizeof(SExprInfo) * pQueryMsg->numOfOutput);
|
||||
|
@ -5186,8 +5201,6 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo
|
|||
assert(isValidDataType(pExprs[i].type, pExprs[i].bytes));
|
||||
}
|
||||
|
||||
// get the correct result size for top/bottom query, according to the number of tags columns in selection clause
|
||||
|
||||
// TODO refactor
|
||||
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
|
||||
pExprs[i].base = *pExprMsg[i];
|
||||
|
@ -5207,7 +5220,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo
|
|||
}
|
||||
|
||||
tfree(pExprMsg);
|
||||
*pSqlFuncExpr = pExprs;
|
||||
*pExprInfo = pExprs;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -5326,25 +5339,32 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
|
|||
|
||||
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||
SSqlFuncMsg *pSqlExprMsg = &pQuery->pSelectExpr[k].base;
|
||||
if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM || pSqlExprMsg->colInfo.flag == TSDB_COL_TAG) {
|
||||
if (pSqlExprMsg->functionId == TSDB_FUNC_ARITHM) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// todo opt performance
|
||||
SColIndex *pColIndex = &pSqlExprMsg->colInfo;
|
||||
if (!TSDB_COL_IS_TAG(pColIndex->flag)) {
|
||||
for (int32_t f = 0; f < pQuery->numOfCols; ++f) {
|
||||
int32_t f = 0;
|
||||
for (f = 0; f < pQuery->numOfCols; ++f) {
|
||||
if (pColIndex->colId == pQuery->colList[f].colId) {
|
||||
pColIndex->colIndex = f;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert (f < pQuery->numOfCols);
|
||||
} else {
|
||||
for (int32_t f = 0; f < pQuery->numOfTags; ++f) {
|
||||
int32_t f = 0;
|
||||
for (f = 0; f < pQuery->numOfTags; ++f) {
|
||||
if (pColIndex->colId == pQuery->tagColList[f].colId) {
|
||||
pColIndex->colIndex = f;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert(f < pQuery->numOfTags || pColIndex->colId == TSDB_TBNAME_COLUMN_INDEX);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5382,7 +5402,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
|||
pQuery->intervalTime = pQueryMsg->intervalTime;
|
||||
pQuery->slidingTime = pQueryMsg->slidingTime;
|
||||
pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit;
|
||||
pQuery->fillType = pQueryMsg->fillType;
|
||||
pQuery->fillType = pQueryMsg->fillType;
|
||||
pQuery->numOfTags = pQueryMsg->numOfTags;
|
||||
|
||||
// todo do not allocate ??
|
||||
|
@ -5464,7 +5484,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
|||
STableId id = *(STableId*) taosArrayGet(pa, j);
|
||||
SGroupItem item = { .id = id };
|
||||
// NOTE: compare STableIdInfo with STableId
|
||||
// not a problem at present because we only use their 1st int64_t field
|
||||
STableIdInfo* pTableId = taosArraySearch( pTableIdList, &id, compareTableIdInfo);
|
||||
if (pTableId != NULL ) {
|
||||
window.skey = pTableId->key;
|
||||
|
@ -5482,7 +5501,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
|||
pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo));
|
||||
|
||||
pQuery->pos = -1;
|
||||
|
||||
pQuery->window = pQueryMsg->window;
|
||||
|
||||
if (sem_init(&pQInfo->dataReady, 0, 0) != 0) {
|
||||
|
@ -5796,14 +5814,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
|
|||
// todo handle the error
|
||||
/*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex,
|
||||
numOfGroupByCols);
|
||||
// if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query
|
||||
// code = TSDB_CODE_SUCCESS;
|
||||
// qTrace("qmsg:%p no results to produce by tag filters, return directly", pQueryMsg);
|
||||
|
||||
// goto _over;
|
||||
// }
|
||||
} else {
|
||||
// groupInfo.numOfTables = taosArrayGetSize(pTableIdList);
|
||||
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
|
||||
|
||||
SArray* sa = taosArrayInit(groupInfo.numOfTables, sizeof(STableId));
|
||||
|
@ -6014,7 +6025,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
|||
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data);
|
||||
assert(bytes <= pExprInfo[j].bytes && type == pExprInfo[j].type);
|
||||
|
||||
char* dst = pQuery->sdata[j]->data + i * bytes;
|
||||
char* dst = pQuery->sdata[j]->data + i * pExprInfo[j].bytes;
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
memcpy(dst, data, varDataTLen(data));
|
||||
} else {
|
||||
|
|
|
@ -5,14 +5,12 @@
|
|||
#include "tsqlfunction.h"
|
||||
#include "queryLog.h"
|
||||
|
||||
#define DEFAULT_INTERN_BUF_SIZE 16384L
|
||||
|
||||
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle) {
|
||||
SDiskbasedResultBuf* pResBuf = calloc(1, sizeof(SDiskbasedResultBuf));
|
||||
pResBuf->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / rowSize;
|
||||
pResBuf->numOfRowsPerPage = (DEFAULT_INTERN_BUF_PAGE_SIZE - sizeof(tFilePage)) / rowSize;
|
||||
pResBuf->numOfPages = size;
|
||||
|
||||
pResBuf->totalBufSize = pResBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE;
|
||||
pResBuf->totalBufSize = pResBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE;
|
||||
pResBuf->incStep = 4;
|
||||
|
||||
// init id hash table
|
||||
|
@ -33,7 +31,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si
|
|||
return TSDB_CODE_CLI_NO_DISKSPACE;
|
||||
}
|
||||
|
||||
int32_t ret = ftruncate(pResBuf->fd, pResBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE);
|
||||
int32_t ret = ftruncate(pResBuf->fd, pResBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to create tmp file: %s on disk. %s", pResBuf->path, strerror(errno));
|
||||
return TSDB_CODE_CLI_NO_DISKSPACE;
|
||||
|
@ -55,7 +53,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si
|
|||
tFilePage* getResultBufferPageById(SDiskbasedResultBuf* pResultBuf, int32_t id) {
|
||||
assert(id < pResultBuf->numOfPages && id >= 0);
|
||||
|
||||
return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_SIZE * id);
|
||||
return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_PAGE_SIZE * id);
|
||||
}
|
||||
|
||||
int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); }
|
||||
|
@ -63,7 +61,7 @@ int32_t getNumOfResultBufGroupId(SDiskbasedResultBuf* pResultBuf) { return taosH
|
|||
int32_t getResBufSize(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
|
||||
|
||||
static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOfPages) {
|
||||
assert(pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE == pResultBuf->totalBufSize);
|
||||
assert(pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE == pResultBuf->totalBufSize);
|
||||
|
||||
int32_t ret = munmap(pResultBuf->pBuf, pResultBuf->totalBufSize);
|
||||
pResultBuf->numOfPages += numOfPages;
|
||||
|
@ -72,14 +70,14 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf
|
|||
* disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
|
||||
* be insufficient
|
||||
*/
|
||||
ret = ftruncate(pResultBuf->fd, pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE);
|
||||
ret = ftruncate(pResultBuf->fd, pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE);
|
||||
if (ret != 0) {
|
||||
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
|
||||
// strerror(errno));
|
||||
return -TSDB_CODE_SERV_NO_DISKSPACE;
|
||||
}
|
||||
|
||||
pResultBuf->totalBufSize = pResultBuf->numOfPages * DEFAULT_INTERN_BUF_SIZE;
|
||||
pResultBuf->totalBufSize = pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE;
|
||||
pResultBuf->pBuf = mmap(NULL, pResultBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResultBuf->fd, 0);
|
||||
|
||||
if (pResultBuf->pBuf == MAP_FAILED) {
|
||||
|
@ -174,7 +172,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
|
|||
tFilePage* page = getResultBufferPageById(pResultBuf, *pageId);
|
||||
|
||||
// clear memory for the new page
|
||||
memset(page, 0, DEFAULT_INTERN_BUF_SIZE);
|
||||
memset(page, 0, DEFAULT_INTERN_BUF_PAGE_SIZE);
|
||||
|
||||
return page;
|
||||
}
|
||||
|
|
|
@ -596,7 +596,7 @@ static int32_t convertToBool(tVariant *pVariant, int64_t *pDest) {
|
|||
*
|
||||
* todo handle the return value
|
||||
*/
|
||||
int32_t tVariantDump(tVariant *pVariant, char *payload, char type, bool includeLengthPrefix) {
|
||||
int32_t tVariantDump(tVariant *pVariant, char *payload, int16_t type, bool includeLengthPrefix) {
|
||||
if (pVariant == NULL || (pVariant->nType != 0 && !isValidDataType(pVariant->nType, pVariant->nLen))) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -211,6 +211,22 @@ TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable
|
|||
return pQueryHandle;
|
||||
}
|
||||
|
||||
SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle) {
|
||||
assert(pHandle != NULL);
|
||||
|
||||
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) pHandle;
|
||||
|
||||
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
SArray* res = taosArrayInit(size, sizeof(STableId));
|
||||
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
||||
taosArrayPush(res, &pCheckInfo->tableId);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList) {
|
||||
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList);
|
||||
|
||||
|
@ -1461,6 +1477,9 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
|
|||
|
||||
for(int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
||||
if (pCheckInfo->pTableObj->tableId.uid == 12094628167747) {
|
||||
printf("abc\n");
|
||||
}
|
||||
if (pCheckInfo->pTableObj->lastKey > key) {
|
||||
key = pCheckInfo->pTableObj->lastKey;
|
||||
index = i;
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
#system sh/stop_dnodes.sh
|
||||
#
|
||||
#system sh/deploy.sh -n dnode1 -i 1
|
||||
#system sh/cfg.sh -n dnode1 -c walLevel -v 0
|
||||
#system sh/exec.sh -n dnode1 -s start
|
||||
#
|
||||
#sleep 3000
|
||||
system sh/stop_dnodes.sh
|
||||
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/cfg.sh -n dnode1 -c walLevel -v 0
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
||||
sleep 3000
|
||||
sql connect
|
||||
|
||||
$dbPrefix = wh_db
|
||||
|
@ -13,39 +13,39 @@ $mtPrefix = wh_mt
|
|||
$tbNum = 10
|
||||
$rowNum = 10000
|
||||
$totalNum = $tbNum * $rowNum
|
||||
#
|
||||
#print =============== where.sim
|
||||
|
||||
print =============== where.sim
|
||||
$i = 0
|
||||
$db = $dbPrefix . $i
|
||||
$mt = $mtPrefix . $i
|
||||
#
|
||||
#sql drop database if exits $db -x step1
|
||||
#step1:
|
||||
|
||||
sql drop database if exits $db -x step1
|
||||
step1:
|
||||
sql create database if not exists $db maxTables 4
|
||||
sql use $db
|
||||
#sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int)
|
||||
#
|
||||
#$i = 0
|
||||
#while $i < $tbNum
|
||||
# $tb = $tbPrefix . $i
|
||||
# sql create table $tb using $mt tags( $i )
|
||||
#
|
||||
# $x = 0
|
||||
# while $x < $rowNum
|
||||
# $ms = $x . m
|
||||
# $c = $x / 100
|
||||
# $c = $c * 100
|
||||
# $c = $x - $c
|
||||
# $binary = 'binary . $c
|
||||
# $binary = $binary . '
|
||||
# $nchar = 'nchar . $c
|
||||
# $nchar = $nchar . '
|
||||
# sql insert into $tb values (now + $ms , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
|
||||
# $x = $x + 1
|
||||
# endw
|
||||
#
|
||||
# $i = $i + 1
|
||||
#endw
|
||||
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int)
|
||||
|
||||
$i = 0
|
||||
while $i < $tbNum
|
||||
$tb = $tbPrefix . $i
|
||||
sql create table $tb using $mt tags( $i )
|
||||
|
||||
$x = 0
|
||||
while $x < $rowNum
|
||||
$ms = $x . m
|
||||
$c = $x / 100
|
||||
$c = $c * 100
|
||||
$c = $x - $c
|
||||
$binary = 'binary . $c
|
||||
$binary = $binary . '
|
||||
$nchar = 'nchar . $c
|
||||
$nchar = $nchar . '
|
||||
sql insert into $tb values (now + $ms , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
|
||||
$x = $x + 1
|
||||
endw
|
||||
|
||||
$i = $i + 1
|
||||
endw
|
||||
|
||||
sleep 100
|
||||
|
||||
|
|
Loading…
Reference in New Issue