[td-225]fix bug for arithmetic calculation on aggregated results.
This commit is contained in:
parent
2b3df7d33e
commit
a12e0d7320
|
@ -203,7 +203,6 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
|
|||
|
||||
void tscSetFreeHeatBeat(STscObj* pObj);
|
||||
bool tscShouldFreeHeatBeat(SSqlObj* pHb);
|
||||
void tscCleanSqlCmd(SSqlCmd* pCmd);
|
||||
bool tscShouldBeFreed(SSqlObj* pSql);
|
||||
|
||||
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
|
||||
|
@ -220,7 +219,6 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
|
|||
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
|
||||
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
|
||||
|
||||
void tscFreeQueryInfo(SSqlCmd* pCmd);
|
||||
void tscInitQueryInfo(SQueryInfo* pQueryInfo);
|
||||
|
||||
void tscClearSubqueryInfo(SSqlCmd* pCmd);
|
||||
|
|
|
@ -88,7 +88,7 @@ typedef struct SSqlExpr {
|
|||
int16_t functionId; // function id in aAgg array
|
||||
int16_t resType; // return value type
|
||||
int16_t resBytes; // length of return value
|
||||
int16_t interResBytes; // inter result buffer size
|
||||
int16_t interBytes; // inter result buffer size
|
||||
int16_t numOfParams; // argument value of each function
|
||||
tVariant param[3]; // parameters are not more than 3
|
||||
int32_t offset; // sub result column value of arithmetic expression.
|
||||
|
@ -283,6 +283,8 @@ typedef struct {
|
|||
int32_t* length; // length for each field for current row
|
||||
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
|
||||
SColumnIndex * pColumnIndex;
|
||||
SArithmeticSupport* pArithSup; // support the arithmetic expression calculation on agg functions
|
||||
|
||||
struct SLocalReducer *pLocalReducer;
|
||||
} SSqlRes;
|
||||
|
||||
|
|
|
@ -153,7 +153,7 @@ typedef struct SRateInfo {
|
|||
|
||||
|
||||
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
|
||||
int16_t *bytes, int16_t *interResBytes, int16_t extLength, bool isSuperTable) {
|
||||
int16_t *bytes, int16_t *interBytes, int16_t extLength, bool isSuperTable) {
|
||||
if (!isValidDataType(dataType, dataBytes)) {
|
||||
tscError("Illegal data type %d or data type length %d", dataType, dataBytes);
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
|
@ -164,35 +164,35 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
|||
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP) {
|
||||
*type = (int16_t)dataType;
|
||||
*bytes = (int16_t)dataBytes;
|
||||
*interResBytes = *bytes + sizeof(SResultInfo);
|
||||
*interBytes = *bytes + sizeof(SResultInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = dataBytes + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t); // (uid, tid) + VGID + TAGSIZE
|
||||
*interResBytes = *bytes;
|
||||
*interBytes = *bytes;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_COUNT) {
|
||||
*type = TSDB_DATA_TYPE_BIGINT;
|
||||
*bytes = sizeof(int64_t);
|
||||
*interResBytes = *bytes;
|
||||
*interBytes = *bytes;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_ARITHM) {
|
||||
*type = TSDB_DATA_TYPE_DOUBLE;
|
||||
*bytes = sizeof(double);
|
||||
*interResBytes = *bytes;
|
||||
*interBytes = *bytes;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_TS_COMP) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = sizeof(int32_t); // this results is compressed ts data
|
||||
*interResBytes = POINTER_BYTES;
|
||||
*interBytes = POINTER_BYTES;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -200,54 +200,54 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
|||
if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = dataBytes + DATA_SET_FLAG_SIZE;
|
||||
*interResBytes = *bytes;
|
||||
*interBytes = *bytes;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (functionId == TSDB_FUNC_SUM) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = sizeof(SSumInfo);
|
||||
*interResBytes = *bytes;
|
||||
*interBytes = *bytes;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (functionId == TSDB_FUNC_AVG) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = sizeof(SAvgInfo);
|
||||
*interResBytes = *bytes;
|
||||
*interBytes = *bytes;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
} else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) {
|
||||
*type = TSDB_DATA_TYPE_DOUBLE;
|
||||
*bytes = sizeof(SRateInfo);
|
||||
*interResBytes = sizeof(SRateInfo);
|
||||
*interBytes = sizeof(SRateInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
|
||||
*interResBytes = *bytes;
|
||||
*interBytes = *bytes;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (functionId == TSDB_FUNC_SPREAD) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = sizeof(SSpreadInfo);
|
||||
*interResBytes = *bytes;
|
||||
*interBytes = *bytes;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (functionId == TSDB_FUNC_APERCT) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo);
|
||||
*interResBytes = *bytes;
|
||||
*interBytes = *bytes;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (functionId == TSDB_FUNC_LAST_ROW) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = sizeof(SLastrowInfo) + dataBytes;
|
||||
*interResBytes = *bytes;
|
||||
*interBytes = *bytes;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (functionId == TSDB_FUNC_TWA) {
|
||||
*type = TSDB_DATA_TYPE_DOUBLE;
|
||||
*bytes = sizeof(STwaInfo);
|
||||
*interResBytes = *bytes;
|
||||
*interBytes = *bytes;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
@ -260,57 +260,57 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
|||
}
|
||||
|
||||
*bytes = sizeof(int64_t);
|
||||
*interResBytes = sizeof(SSumInfo);
|
||||
*interBytes = sizeof(SSumInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (functionId == TSDB_FUNC_APERCT) {
|
||||
*type = TSDB_DATA_TYPE_DOUBLE;
|
||||
*bytes = sizeof(double);
|
||||
*interResBytes =
|
||||
*interBytes =
|
||||
sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (functionId == TSDB_FUNC_TWA) {
|
||||
*type = TSDB_DATA_TYPE_DOUBLE;
|
||||
*bytes = sizeof(double);
|
||||
*interResBytes = sizeof(STwaInfo);
|
||||
*interBytes = sizeof(STwaInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_AVG) {
|
||||
*type = TSDB_DATA_TYPE_DOUBLE;
|
||||
*bytes = sizeof(double);
|
||||
*interResBytes = sizeof(SAvgInfo);
|
||||
*interBytes = sizeof(SAvgInfo);
|
||||
} else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) {
|
||||
*type = TSDB_DATA_TYPE_DOUBLE;
|
||||
*bytes = sizeof(double);
|
||||
*interResBytes = sizeof(SRateInfo);
|
||||
*interBytes = sizeof(SRateInfo);
|
||||
} else if (functionId == TSDB_FUNC_STDDEV) {
|
||||
*type = TSDB_DATA_TYPE_DOUBLE;
|
||||
*bytes = sizeof(double);
|
||||
*interResBytes = sizeof(SStddevInfo);
|
||||
*interBytes = sizeof(SStddevInfo);
|
||||
} else if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
|
||||
*type = (int16_t)dataType;
|
||||
*bytes = (int16_t)dataBytes;
|
||||
*interResBytes = dataBytes + DATA_SET_FLAG_SIZE;
|
||||
*interBytes = dataBytes + DATA_SET_FLAG_SIZE;
|
||||
} else if (functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_LAST) {
|
||||
*type = (int16_t)dataType;
|
||||
*bytes = (int16_t)dataBytes;
|
||||
*interResBytes = dataBytes + sizeof(SResultInfo);
|
||||
*interBytes = dataBytes + sizeof(SResultInfo);
|
||||
} else if (functionId == TSDB_FUNC_SPREAD) {
|
||||
*type = (int16_t)TSDB_DATA_TYPE_DOUBLE;
|
||||
*bytes = sizeof(double);
|
||||
*interResBytes = sizeof(SSpreadInfo);
|
||||
*interBytes = sizeof(SSpreadInfo);
|
||||
} else if (functionId == TSDB_FUNC_PERCT) {
|
||||
*type = (int16_t)TSDB_DATA_TYPE_DOUBLE;
|
||||
*bytes = (int16_t)sizeof(double);
|
||||
*interResBytes = (int16_t)sizeof(double);
|
||||
*interBytes = (int16_t)sizeof(double);
|
||||
} else if (functionId == TSDB_FUNC_LEASTSQR) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE; // string
|
||||
*interResBytes = *bytes + sizeof(SResultInfo);
|
||||
*interBytes = *bytes + sizeof(SResultInfo);
|
||||
} else if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = dataBytes + sizeof(SFirstLastInfo);
|
||||
*interResBytes = *bytes;
|
||||
*interBytes = *bytes;
|
||||
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
|
||||
*type = (int16_t)dataType;
|
||||
*bytes = (int16_t)dataBytes;
|
||||
|
@ -318,11 +318,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
|||
size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
|
||||
|
||||
// the output column may be larger than sizeof(STopBotInfo)
|
||||
*interResBytes = size;
|
||||
*interBytes = size;
|
||||
} else if (functionId == TSDB_FUNC_LAST_ROW) {
|
||||
*type = (int16_t)dataType;
|
||||
*bytes = (int16_t)dataBytes;
|
||||
*interResBytes = dataBytes + sizeof(SLastrowInfo);
|
||||
*interBytes = dataBytes + sizeof(SLastrowInfo);
|
||||
} else {
|
||||
return TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
|
|
@ -960,6 +960,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
|
|||
*sqlstr = sql;
|
||||
}
|
||||
|
||||
if (*sqlstr == NULL) {
|
||||
code = TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -1198,48 +1198,26 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
|
|||
columnList.num = 0;
|
||||
columnList.ids[0] = (SColumnIndex) {0, 0};
|
||||
|
||||
insertResultField(pQueryInfo, i, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, "abc", NULL);
|
||||
insertResultField(pQueryInfo, i, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, "dummy_column", NULL);
|
||||
|
||||
int32_t slot = tscNumOfFields(pQueryInfo) - 1;
|
||||
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, slot);
|
||||
|
||||
if (pInfo->pSqlExpr == NULL) {
|
||||
SExprInfo* pFuncExpr = calloc(1, sizeof(SExprInfo));
|
||||
pInfo->pArithExprInfo = pFuncExpr;
|
||||
SExprInfo* pArithExprInfo = calloc(1, sizeof(SExprInfo));
|
||||
|
||||
// arithmetic expression always return result in the format of double float
|
||||
pFuncExpr->bytes = sizeof(double);
|
||||
pFuncExpr->interResBytes = sizeof(double);
|
||||
pFuncExpr->type = TSDB_DATA_TYPE_DOUBLE;
|
||||
pArithExprInfo->bytes = sizeof(double);
|
||||
pArithExprInfo->interBytes = sizeof(double);
|
||||
pArithExprInfo->type = TSDB_DATA_TYPE_DOUBLE;
|
||||
|
||||
tExprNode* pNode = NULL;
|
||||
// SArray* colList = taosArrayInit(10, sizeof(SColIndex));
|
||||
|
||||
int32_t ret = exprTreeFromSqlExpr(&pNode, pItem->pNode, pQueryInfo->exprList, pQueryInfo, NULL);
|
||||
int32_t ret = exprTreeFromSqlExpr(&pArithExprInfo->pExpr, pItem->pNode, pQueryInfo->exprList, pQueryInfo, NULL);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
tExprTreeDestroy(&pNode, NULL);
|
||||
tExprTreeDestroy(&pArithExprInfo->pExpr, NULL);
|
||||
return invalidSqlErrMsg(pQueryInfo->msg, "invalid expression in select clause");
|
||||
}
|
||||
|
||||
pFuncExpr->pExpr = pNode;
|
||||
assert(0);
|
||||
// pExprInfo->pReqColumns = pColIndex;
|
||||
|
||||
// for(int32_t k = 0; k < pFuncExpr->numOfCols; ++k) {
|
||||
// SColIndex* pCol = &pFuncExpr->colList[k];
|
||||
// size_t size = tscSqlExprNumOfExprs(pQueryInfo);
|
||||
//
|
||||
// for(int32_t f = 0; f < size; ++f) {
|
||||
// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, f);
|
||||
// if (strcmp(pExpr->aliasName, pCol->name) == 0) {
|
||||
// pCol->colIndex = f;
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// assert(pCol->colIndex >= 0 && pCol->colIndex < size);
|
||||
// tfree(pNode);
|
||||
// }
|
||||
pInfo->pArithExprInfo = pArithExprInfo;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -2278,7 +2256,7 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo* pQueryInfo) {
|
|||
|
||||
tscSqlExprUpdate(pQueryInfo, k, functionId, pExpr->colInfo.colIndex, TSDB_DATA_TYPE_BINARY, bytes);
|
||||
// todo refactor
|
||||
pExpr->interResBytes = intermediateBytes;
|
||||
pExpr->interBytes = intermediateBytes;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4937,7 +4915,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
|
|||
if (pExpr->functionId != TSDB_FUNC_TAG_DUMMY && pExpr->functionId != TSDB_FUNC_TS_DUMMY) {
|
||||
SSchema* pColSchema = &pSchema[pExpr->colInfo.colIndex];
|
||||
getResultDataInfo(pColSchema->type, pColSchema->bytes, pExpr->functionId, pExpr->param[0].i64Key, &pExpr->resType,
|
||||
&pExpr->resBytes, &pExpr->interResBytes, tagLength, true);
|
||||
&pExpr->resBytes, &pExpr->interBytes, tagLength, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,7 +101,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
|
|||
}
|
||||
|
||||
SResultInfo *pResInfo = &pReducer->pResInfo[i];
|
||||
pResInfo->bufLen = pExpr->interResBytes;
|
||||
pResInfo->bufLen = pExpr->interBytes;
|
||||
pResInfo->interResultBuf = calloc(1, (size_t)pResInfo->bufLen);
|
||||
|
||||
pCtx->resultInfo = &pReducer->pResInfo[i];
|
||||
|
|
|
@ -2363,8 +2363,10 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
|
|||
pRes->data = pRetrieve->data;
|
||||
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||
tscSetResultPointer(pQueryInfo, pRes);
|
||||
|
||||
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
|
||||
return pRes->code;
|
||||
}
|
||||
|
||||
if (pSql->pSubscription != NULL) {
|
||||
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
|
||||
|
||||
|
|
|
@ -433,23 +433,6 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
|
|||
return (pQueryInfo->order.order == TSDB_ORDER_DESC) ? pRes->numOfRows : -pRes->numOfRows;
|
||||
}
|
||||
|
||||
static UNUSED_FUNC char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
|
||||
// SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
|
||||
// SExprInfo * pExpr = pSupport->pArithExpr;
|
||||
|
||||
// int32_t index = -1;
|
||||
// for (int32_t i = 0; i < pExpr->numOfCols; ++i) {
|
||||
// if (strcmp(name, pExpr->colList[i].name) == 0) {
|
||||
// index = i;
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// assert(index >= 0 && index < pExpr->numOfCols);
|
||||
// return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index];
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
|
||||
SSqlObj* pSql = (SSqlObj*) tres;
|
||||
|
||||
|
@ -878,7 +861,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
|
|||
|
||||
static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) {
|
||||
// must before clean the sqlcmd object
|
||||
tscCleanSqlCmd(&pSql->cmd);
|
||||
tscResetSqlCmdObj(&pSql->cmd);
|
||||
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
|
|
|
@ -508,7 +508,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
|
|||
SSqlInfo SQLInfo = {0};
|
||||
tSQLParse(&SQLInfo, pSql->sqlstr);
|
||||
|
||||
tscCleanSqlCmd(&pSql->cmd);
|
||||
tscResetSqlCmdObj(&pSql->cmd);
|
||||
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
|
||||
if (TSDB_CODE_SUCCESS != ret) {
|
||||
setErrorInfo(pObj, ret, NULL);
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "tscSubquery.h"
|
||||
#include <qast.h>
|
||||
#include <tcompare.h>
|
||||
#include <tschemautil.h>
|
||||
#include "os.h"
|
||||
|
@ -1924,7 +1925,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
|
|||
static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
||||
if (isNull(pRes->tsrow[columnIndex], pField->type)) {
|
||||
if (pRes->tsrow[columnIndex] != NULL && isNull(pRes->tsrow[columnIndex], pField->type)) {
|
||||
pRes->tsrow[columnIndex] = NULL;
|
||||
} else if (pField->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
|
||||
|
@ -1944,6 +1945,24 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF
|
|||
}
|
||||
}
|
||||
|
||||
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
|
||||
SArithmeticSupport *pSupport = (SArithmeticSupport *) param;
|
||||
|
||||
int32_t index = -1;
|
||||
SSqlExpr* pExpr = NULL;
|
||||
|
||||
for (int32_t i = 0; i < pSupport->numOfCols; ++i) {
|
||||
pExpr = taosArrayGetP(pSupport->exprList, i);
|
||||
if (strncmp(name, pExpr->aliasName, TSDB_COL_NAME_LEN) == 0) {
|
||||
index = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert(index >= 0 && index < pSupport->numOfCols);
|
||||
return pSupport->data[index] + pSupport->offset * pExpr->resBytes;
|
||||
}
|
||||
|
||||
void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
@ -1981,27 +2000,30 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
|
|||
|
||||
// calculate the result from several other columns
|
||||
if (pSup->pArithExprInfo != NULL) {
|
||||
// SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport));
|
||||
// sas->offset = 0;
|
||||
// sas-> = pQueryInfo->fieldsInfo.pExpr[i];
|
||||
//
|
||||
// sas->numOfCols = sas->pExpr->binExprInfo.numOfCols;
|
||||
//
|
||||
// if (pRes->buffer[i] == NULL) {
|
||||
// pRes->buffer[i] = malloc(tscFieldInfoGetField(pQueryInfo, i)->bytes);
|
||||
// }
|
||||
//
|
||||
// for(int32_t k = 0; k < sas->numOfCols; ++k) {
|
||||
// int32_t columnIndex = sas->pExpr->binExprInfo.pReqColumns[k].colIdxInBuf;
|
||||
// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
|
||||
//
|
||||
// sas->elemSize[k] = pExpr->resBytes;
|
||||
// sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
|
||||
// }
|
||||
//
|
||||
// tSQLBinaryExprCalcTraverse(sas->pExpr->binExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSQL_SO_ASC, getArithemicInputSrc);
|
||||
// pRes->tsrow[i] = pRes->buffer[i];
|
||||
//
|
||||
if (pRes->pArithSup == NULL) {
|
||||
SArithmeticSupport *sas = (SArithmeticSupport *) calloc(1, sizeof(SArithmeticSupport));
|
||||
sas->offset = 0;
|
||||
sas->pArithExpr = pSup->pArithExprInfo;
|
||||
sas->numOfCols = tscSqlExprNumOfExprs(pQueryInfo);
|
||||
sas->exprList = pQueryInfo->exprList;
|
||||
sas->data = calloc(sas->numOfCols, POINTER_BYTES);
|
||||
|
||||
pRes->pArithSup = sas;
|
||||
}
|
||||
|
||||
if (pRes->buffer[i] == NULL) {
|
||||
TAOS_FIELD* field = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
|
||||
pRes->buffer[i] = malloc(field->bytes);
|
||||
}
|
||||
|
||||
for(int32_t k = 0; k < pRes->pArithSup->numOfCols; ++k) {
|
||||
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
|
||||
pRes->pArithSup->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
|
||||
}
|
||||
|
||||
tExprTreeCalcTraverse(pRes->pArithSup->pArithExpr->pExpr, 1, pRes->buffer[i], pRes->pArithSup,
|
||||
TSDB_ORDER_ASC, getArithemicInputSrc);
|
||||
pRes->tsrow[i] = pRes->buffer[i];
|
||||
// free(sas); //todo optimization
|
||||
}
|
||||
}
|
||||
|
@ -2010,7 +2032,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
|
|||
return pRes->tsrow;
|
||||
}
|
||||
|
||||
static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
|
||||
static UNUSED_FUNC bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
|
||||
bool hasData = true;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
|
|
|
@ -30,7 +30,10 @@
|
|||
#include "ttokendef.h"
|
||||
#include "tscLog.h"
|
||||
|
||||
SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) {
|
||||
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
|
||||
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
|
||||
|
||||
SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) {
|
||||
if (pTagCond->pCond == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -309,6 +312,7 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) {
|
|||
for (int i = 0; i < pRes->numOfCols; i++) {
|
||||
tfree(pRes->buffer[i]);
|
||||
}
|
||||
|
||||
pRes->numOfCols = 0;
|
||||
}
|
||||
|
||||
|
@ -320,9 +324,32 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) {
|
|||
tfree(pRes->pColumnIndex);
|
||||
tfree(pRes->buffer);
|
||||
|
||||
if (pRes->pArithSup != NULL) {
|
||||
tfree(pRes->pArithSup->data);
|
||||
tfree(pRes->pArithSup);
|
||||
}
|
||||
|
||||
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
|
||||
}
|
||||
|
||||
static void tscFreeQueryInfo(SSqlCmd* pCmd) {
|
||||
if (pCmd == NULL || pCmd->numOfClause == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
|
||||
char* addr = (char*)pCmd - offsetof(SSqlObj, cmd);
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
|
||||
|
||||
freeQueryInfoImpl(pQueryInfo);
|
||||
clearAllTableMetaInfo(pQueryInfo, (const char*)addr, false);
|
||||
tfree(pQueryInfo);
|
||||
}
|
||||
|
||||
pCmd->numOfClause = 0;
|
||||
tfree(pCmd->pQueryInfo);
|
||||
}
|
||||
|
||||
void tscResetSqlCmdObj(SSqlCmd* pCmd) {
|
||||
pCmd->command = 0;
|
||||
pCmd->numOfCols = 0;
|
||||
|
@ -332,9 +359,10 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) {
|
|||
pCmd->parseFinished = 0;
|
||||
|
||||
taosHashCleanup(pCmd->pTableList);
|
||||
pCmd->pTableList= NULL;
|
||||
pCmd->pTableList = NULL;
|
||||
|
||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||
|
||||
tscFreeQueryInfo(pCmd);
|
||||
}
|
||||
|
||||
|
@ -343,6 +371,7 @@ void tscFreeSqlResult(SSqlObj* pSql) {
|
|||
|
||||
SSqlRes* pRes = &pSql->res;
|
||||
tscDestroyResPointerInfo(pRes);
|
||||
|
||||
memset(&pSql->res, 0, sizeof(SSqlRes));
|
||||
}
|
||||
|
||||
|
@ -366,8 +395,8 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
|
|||
pthread_mutex_unlock(&pObj->mutex);
|
||||
|
||||
tscFreeSqlResult(pSql);
|
||||
tfree(pSql->pSubs);
|
||||
|
||||
tfree(pSql->pSubs);
|
||||
pSql->freed = 0;
|
||||
pSql->numOfSubs = 0;
|
||||
|
||||
|
@ -913,6 +942,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
|
|||
|
||||
if (pInfo->pArithExprInfo != NULL) {
|
||||
tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL);
|
||||
tfree(pInfo->pArithExprInfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -947,7 +977,7 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
|
|||
pExpr->colInfo.colIndex = pColIndex->columnIndex;
|
||||
pExpr->resType = type;
|
||||
pExpr->resBytes = size;
|
||||
pExpr->interResBytes = interSize;
|
||||
pExpr->interBytes = interSize;
|
||||
pExpr->uid = pTableMetaInfo->pTableMeta->uid;
|
||||
|
||||
return pExpr;
|
||||
|
@ -1422,20 +1452,6 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb) {
|
|||
return pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE;
|
||||
}
|
||||
|
||||
void tscCleanSqlCmd(SSqlCmd* pCmd) {
|
||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||
tscFreeQueryInfo(pCmd);
|
||||
|
||||
uint32_t allocSize = pCmd->allocSize;
|
||||
char* allocPtr = pCmd->payload;
|
||||
|
||||
memset(pCmd, 0, sizeof(SSqlCmd));
|
||||
|
||||
// restore values
|
||||
pCmd->allocSize = allocSize;
|
||||
pCmd->payload = allocPtr;
|
||||
}
|
||||
|
||||
/*
|
||||
* the following three kinds of SqlObj should not be freed
|
||||
* 1. SqlObj for stream computing
|
||||
|
@ -1630,24 +1646,6 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool rem
|
|||
tfree(pQueryInfo->pTableMetaInfo);
|
||||
}
|
||||
|
||||
void tscFreeQueryInfo(SSqlCmd* pCmd) {
|
||||
if (pCmd == NULL || pCmd->numOfClause == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
|
||||
char* addr = (char*)pCmd - offsetof(SSqlObj, cmd);
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
|
||||
|
||||
freeQueryInfoImpl(pQueryInfo);
|
||||
clearAllTableMetaInfo(pQueryInfo, (const char*)addr, false);
|
||||
tfree(pQueryInfo);
|
||||
}
|
||||
|
||||
pCmd->numOfClause = 0;
|
||||
tfree(pCmd->pQueryInfo);
|
||||
}
|
||||
|
||||
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
|
||||
SVgroupsInfo* vgroupList, SArray* pTagCols) {
|
||||
void* pAlloc = realloc(pQueryInfo->pTableMetaInfo, (pQueryInfo->numOfTables + 1) * POINTER_BYTES);
|
||||
|
|
|
@ -228,7 +228,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
|||
int32_t num = taosGetQueueNumber(pWorker->qset);
|
||||
|
||||
if (num > 0) {
|
||||
usleep(30000);
|
||||
usleep(30);
|
||||
sched_yield();
|
||||
} else {
|
||||
taosFreeQall(pWorker->qall);
|
||||
|
|
|
@ -385,7 +385,7 @@ typedef struct SExprInfo {
|
|||
struct tExprNode* pExpr;
|
||||
int16_t bytes;
|
||||
int16_t type;
|
||||
int16_t interResBytes;
|
||||
int16_t interBytes;
|
||||
} SExprInfo;
|
||||
|
||||
typedef struct SColumnFilterInfo {
|
||||
|
|
|
@ -119,6 +119,7 @@ typedef struct SArithmeticSupport {
|
|||
SExprInfo *pArithExpr;
|
||||
int32_t numOfCols;
|
||||
SColumnInfo *colList;
|
||||
SArray* exprList; // client side used
|
||||
int32_t offset;
|
||||
char** data;
|
||||
} SArithmeticSupport;
|
||||
|
@ -220,7 +221,7 @@ typedef struct SQLAggFuncElem {
|
|||
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
|
||||
|
||||
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
|
||||
int16_t *len, int16_t *interResBytes, int16_t extLength, bool isSuperTable);
|
||||
int16_t *len, int16_t *interBytes, int16_t extLength, bool isSuperTable);
|
||||
|
||||
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
|
||||
#define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0)
|
||||
|
|
|
@ -979,7 +979,8 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
|
|||
|
||||
} else if (pRight->nodeType == TSQL_NODE_COL) { // 12 + columnRight
|
||||
// column data specified on right-hand-side
|
||||
char * pRightInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId);
|
||||
char *pRightInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId);
|
||||
|
||||
_bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, pRight->pSchema->type, pExprs->_node.optr);
|
||||
fp(&pLeft->pVal->i64Key, pRightInputData, 1, numOfRows, pOutput, order);
|
||||
|
||||
|
|
|
@ -1402,7 +1402,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
|
|||
|
||||
static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) {
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
setResultInfoBuf(&pResultInfo[i], pQuery->pSelectExpr[i].interResBytes, isStableQuery);
|
||||
setResultInfoBuf(&pResultInfo[i], pQuery->pSelectExpr[i].interBytes, isStableQuery);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5540,7 +5540,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo
|
|||
|
||||
int32_t param = pExprs[i].base.arg[0].argValue.i64;
|
||||
if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes,
|
||||
&pExprs[i].interResBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) {
|
||||
&pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) {
|
||||
tfree(pExprs);
|
||||
return TSDB_CODE_INVALID_QUERY_MSG;
|
||||
}
|
||||
|
@ -5566,7 +5566,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo
|
|||
|
||||
int32_t ret =
|
||||
getResultDataInfo(pCol->type, pCol->bytes, functId, pExprs[i].base.arg[0].argValue.i64,
|
||||
&pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interResBytes, tagLen, isSuperTable);
|
||||
&pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interBytes, tagLen, isSuperTable);
|
||||
assert(ret == TSDB_CODE_SUCCESS);
|
||||
}
|
||||
}
|
||||
|
@ -5780,10 +5780,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|||
pQuery->rec.threshold = 4000;
|
||||
|
||||
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
|
||||
assert(pExprs[col].interResBytes >= pExprs[col].bytes);
|
||||
assert(pExprs[col].interBytes >= pExprs[col].bytes);
|
||||
|
||||
// allocate additional memory for interResults that are usually larger then final results
|
||||
size_t size = (pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interResBytes + sizeof(SData);
|
||||
size_t size = (pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interBytes + sizeof(SData);
|
||||
pQuery->sdata[col] = (SData *)calloc(1, size);
|
||||
if (pQuery->sdata[col] == NULL) {
|
||||
goto _cleanup;
|
||||
|
|
|
@ -89,5 +89,4 @@ endi
|
|||
|
||||
#### illegal operations
|
||||
sql_error select max(c2*2) from $tb
|
||||
sql_error select 2*min(c1) from $tb
|
||||
sql_error select max(c1-c2) from $tb
|
||||
|
|
Loading…
Reference in New Issue