commit
cfac2af4fd
|
@ -130,11 +130,11 @@ typedef struct STopBotInfo {
|
|||
} STopBotInfo;
|
||||
|
||||
// leastsquares do not apply to super table
|
||||
typedef struct SLeastsquareInfo {
|
||||
typedef struct SLeastsquaresInfo {
|
||||
double mat[2][3];
|
||||
double startVal;
|
||||
int64_t num;
|
||||
} SLeastsquareInfo;
|
||||
} SLeastsquaresInfo;
|
||||
|
||||
typedef struct SAPercentileInfo {
|
||||
SHistogramInfo *pHisto;
|
||||
|
@ -316,7 +316,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
|||
*interBytes = (int16_t)sizeof(SPercentileInfo);
|
||||
} else if (functionId == TSDB_FUNC_LEASTSQR) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
*bytes = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE; // string
|
||||
*bytes = MAX(TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE, sizeof(SLeastsquaresInfo)); // string
|
||||
*interBytes = *bytes;
|
||||
} else if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST) {
|
||||
*type = TSDB_DATA_TYPE_BINARY;
|
||||
|
@ -2756,7 +2756,7 @@ static bool leastsquares_function_setup(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
|
||||
SLeastsquareInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
// 2*3 matrix
|
||||
pInfo->startVal = pCtx->param[0].dKey;
|
||||
|
@ -2783,7 +2783,7 @@ static bool leastsquares_function_setup(SQLFunctionCtx *pCtx) {
|
|||
|
||||
static void leastsquares_function(SQLFunctionCtx *pCtx) {
|
||||
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
|
||||
SLeastsquareInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
double(*param)[3] = pInfo->mat;
|
||||
double x = pInfo->startVal;
|
||||
|
@ -2853,40 +2853,40 @@ static void leastsquares_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
return;
|
||||
}
|
||||
|
||||
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
|
||||
SLeastsquareInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
double(*param)[3] = pInfo->mat;
|
||||
|
||||
switch (pCtx->inputType) {
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int32_t *p = pData;
|
||||
LEASTSQR_CAL(param, pInfo->startVal, p, index, pCtx->param[1].dKey);
|
||||
LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey);
|
||||
break;
|
||||
};
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t *p = pData;
|
||||
LEASTSQR_CAL(param, pInfo->startVal, p, index, pCtx->param[1].dKey);
|
||||
LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int16_t *p = pData;
|
||||
LEASTSQR_CAL(param, pInfo->startVal, p, index, pCtx->param[1].dKey);
|
||||
LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int64_t *p = pData;
|
||||
LEASTSQR_CAL(param, pInfo->startVal, p, index, pCtx->param[1].dKey);
|
||||
LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
float *p = pData;
|
||||
LEASTSQR_CAL(param, pInfo->startVal, p, index, pCtx->param[1].dKey);
|
||||
LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double *p = pData;
|
||||
LEASTSQR_CAL(param, pInfo->startVal, p, index, pCtx->param[1].dKey);
|
||||
LEASTSQR_CAL(param, pInfo->startVal, p, 0, pCtx->param[1].dKey);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -2904,15 +2904,10 @@ static void leastsquares_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
|
||||
// no data in query
|
||||
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
|
||||
SLeastsquareInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
SLeastsquaresInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
if (pInfo->num == 0) {
|
||||
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
|
||||
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
|
||||
} else {
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
}
|
||||
|
||||
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -171,11 +171,10 @@ typedef struct SQuery {
|
|||
|
||||
typedef struct SQueryRuntimeEnv {
|
||||
jmp_buf env;
|
||||
SResultRow* pResultRow; // todo refactor to merge with SWindowResInfo
|
||||
SQuery* pQuery;
|
||||
SQLFunctionCtx* pCtx;
|
||||
int32_t numOfRowsPerPage;
|
||||
uint16_t offset[TSDB_MAX_COLUMNS];
|
||||
uint16_t* offset;
|
||||
uint16_t scanFlag; // denotes reversed scan of data or not
|
||||
SFillInfo* pFillInfo;
|
||||
SWindowResInfo windowResInfo;
|
||||
|
|
|
@ -660,7 +660,13 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
|
|||
*/
|
||||
static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
if (pRuntimeEnv->scanFlag != MASTER_SCAN || (!QUERY_IS_INTERVAL_QUERY(pQuery))) {
|
||||
if (pRuntimeEnv->scanFlag != MASTER_SCAN) {
|
||||
return pWindowResInfo->size;
|
||||
}
|
||||
|
||||
// for group by normal column query, close time window and return.
|
||||
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
closeAllTimeWindow(pWindowResInfo);
|
||||
return pWindowResInfo->size;
|
||||
}
|
||||
|
||||
|
@ -1251,7 +1257,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
|
|||
return QUERY_IS_ASC_QUERY(pQuery);
|
||||
}
|
||||
|
||||
// todo add comments
|
||||
// denote the order type
|
||||
if ((functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST)) {
|
||||
return pCtx->param[0].i64Key == pQuery->order.order;
|
||||
}
|
||||
|
@ -1448,7 +1454,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
|||
|
||||
// interval query with limit applied
|
||||
int32_t numOfRes = 0;
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
|
||||
numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
|
||||
} else {
|
||||
numOfRes = (int32_t)getNumOfResult(pRuntimeEnv);
|
||||
|
@ -1621,10 +1627,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
|
|||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx));
|
||||
pRuntimeEnv->offset = calloc(pQuery->numOfOutput, sizeof(int16_t));
|
||||
pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t));
|
||||
pRuntimeEnv->pResultRow = getNewResultRow(pRuntimeEnv->pool);
|
||||
|
||||
if (pRuntimeEnv->pResultRow == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) {
|
||||
if (pRuntimeEnv->offset == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) {
|
||||
goto _clean;
|
||||
}
|
||||
|
||||
|
@ -1664,15 +1670,15 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
|
|||
assert(isValidDataType(pCtx->inputType));
|
||||
pCtx->ptsOutputBuf = NULL;
|
||||
|
||||
pCtx->outputBytes = pQuery->pExpr1[i].bytes;
|
||||
pCtx->outputType = pQuery->pExpr1[i].type;
|
||||
pCtx->outputBytes = pQuery->pExpr1[i].bytes;
|
||||
pCtx->outputType = pQuery->pExpr1[i].type;
|
||||
|
||||
pCtx->order = pQuery->order.order;
|
||||
pCtx->functionId = pSqlFuncMsg->functionId;
|
||||
pCtx->stableQuery = pRuntimeEnv->stableQuery;
|
||||
pCtx->order = pQuery->order.order;
|
||||
pCtx->functionId = pSqlFuncMsg->functionId;
|
||||
pCtx->stableQuery = pRuntimeEnv->stableQuery;
|
||||
pCtx->interBufBytes = pQuery->pExpr1[i].interBytes;
|
||||
|
||||
pCtx->numOfParams = pSqlFuncMsg->numOfParams;
|
||||
pCtx->numOfParams = pSqlFuncMsg->numOfParams;
|
||||
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
|
||||
int16_t type = pSqlFuncMsg->arg[j].argType;
|
||||
int16_t bytes = pSqlFuncMsg->arg[j].argBytes;
|
||||
|
@ -1720,6 +1726,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
|
|||
|
||||
_clean:
|
||||
tfree(pRuntimeEnv->pCtx);
|
||||
tfree(pRuntimeEnv->offset);
|
||||
tfree(pRuntimeEnv->rowCellInfoOffset);
|
||||
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -1769,6 +1777,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
doFreeQueryHandle(pQInfo);
|
||||
|
||||
pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf);
|
||||
|
||||
tfree(pRuntimeEnv->offset);
|
||||
tfree(pRuntimeEnv->keyBuf);
|
||||
tfree(pRuntimeEnv->rowCellInfoOffset);
|
||||
|
||||
|
@ -2409,7 +2419,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa
|
|||
assert(bytes > 0 && capacity > 0);
|
||||
|
||||
char *tmp = realloc(pQuery->sdata[i], bytes * capacity + sizeof(tFilePage));
|
||||
if (tmp == NULL) { // todo handle the oom
|
||||
if (tmp == NULL) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
} else {
|
||||
pQuery->sdata[i] = (tFilePage *)tmp;
|
||||
|
@ -2440,7 +2450,7 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
|
|||
assert(bytes > 0 && newSize > 0);
|
||||
|
||||
char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage));
|
||||
if (tmp == NULL) { // todo handle the oom
|
||||
if (tmp == NULL) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
} else {
|
||||
memset(tmp + sizeof(tFilePage) + bytes * pRec->rows, 0, (size_t)((newSize - pRec->rows) * bytes));
|
||||
|
@ -3331,7 +3341,13 @@ int32_t initResultRow(SResultRow *pResultRow) {
|
|||
|
||||
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
SResultRow* pRow = pRuntimeEnv->pResultRow;
|
||||
|
||||
SResultRow* pRow = NULL;
|
||||
// if (pRuntimeEnv->windowResInfo.size == 0) {
|
||||
int32_t groupIndex = 0;
|
||||
int32_t uid = 0;
|
||||
pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&groupIndex, sizeof(groupIndex), true, uid);
|
||||
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||
|
@ -4613,12 +4629,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|||
tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order);
|
||||
}
|
||||
|
||||
// create runtime environment
|
||||
code = setupQueryRuntimeEnv(pRuntimeEnv, pQuery->order.order);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t ps = DEFAULT_PAGE_SIZE;
|
||||
int32_t rowsize = 0;
|
||||
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
|
||||
|
@ -4650,7 +4660,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|||
return code;
|
||||
}
|
||||
}
|
||||
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
} else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery) || (!isSTableQuery)) {
|
||||
int32_t numOfResultRows = getInitialPageNum(pQInfo);
|
||||
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
|
||||
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TWOMB, pQInfo);
|
||||
|
@ -4671,6 +4681,12 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|||
}
|
||||
}
|
||||
|
||||
// create runtime environment
|
||||
code = setupQueryRuntimeEnv(pRuntimeEnv, pQuery->order.order);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
|
||||
SFillColInfo* pColInfo = createFillColInfo(pQuery);
|
||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||
|
|
|
@ -165,7 +165,7 @@ static char* doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
|
|||
|
||||
static char* flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
|
||||
int32_t ret = TSDB_CODE_SUCCESS;
|
||||
assert((int64_t)pResultBuf->numOfPages * pResultBuf->pageSize == pResultBuf->totalBufSize && pResultBuf->numOfPages >= pResultBuf->inMemPages);
|
||||
assert(((int64_t) pResultBuf->numOfPages * pResultBuf->pageSize) == pResultBuf->totalBufSize && pResultBuf->numOfPages >= pResultBuf->inMemPages);
|
||||
|
||||
if (pResultBuf->file == NULL) {
|
||||
if ((ret = createDiskFile(pResultBuf)) != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -533,6 +533,69 @@ if $data03 != 99.000000000 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
print ============>td-1765
|
||||
sql select percentile(c4, 49),min(c4),max(c4),avg(c4),stddev(c4) from group_tb0 group by c8;
|
||||
if $rows != 100 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 4851.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 9900 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 4950.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 2886.607004772 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != 4852.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != 9901 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data13 != 4951.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 2886.607004772 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print ================>td-2090
|
||||
sql select leastsquares(c2, 1, 1) from group_tb1 group by c8;
|
||||
if $rows != 100 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != @{slop:0.000000, intercept:0.000000}@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != @{slop:0.000000, intercept:1.000000}@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data90 != @{slop:0.000000, intercept:9.000000}@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#=========================== group by multi tags ======================
|
||||
sql create table st (ts timestamp, c int) tags (t1 int, t2 int, t3 int, t4 int);
|
||||
|
|
Loading…
Reference in New Issue