[td-4314]<enhance>:support top/bottom query on main query.
This commit is contained in:
parent
64efcb996d
commit
de15453c0e
|
@ -7422,6 +7422,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
const char* msg1 = "point interpolation query needs timestamp";
|
||||
const char* msg2 = "too many tables in from clause";
|
||||
const char* msg3 = "start(end) time of query range required or time range too large";
|
||||
const char* msg4 = "interval query not supported, since the result of sub query not include valid timestamp column";
|
||||
const char* msg9 = "only tag query not compatible with normal column filter";
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -7480,9 +7481,16 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
if (validateIntervalNode(pSql, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
} else {
|
||||
if (isTimeWindowQuery(pQueryInfo) &&
|
||||
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
if (isTimeWindowQuery(pQueryInfo)) {
|
||||
// check if the first column of the nest query result is timestamp column
|
||||
SColumn* pCol = taosArrayGetP(pQueryInfo->colList, 0);
|
||||
if (pCol->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
}
|
||||
|
||||
if (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -477,7 +477,6 @@ int doBuildAndSendMsg(SSqlObj *pSql) {
|
|||
pCmd->command == TSDB_SQL_INSERT ||
|
||||
pCmd->command == TSDB_SQL_CONNECT ||
|
||||
pCmd->command == TSDB_SQL_HB ||
|
||||
// pCmd->command == TSDB_SQL_META ||
|
||||
pCmd->command == TSDB_SQL_STABLEVGROUP) {
|
||||
pRes->code = tscBuildMsg[pCmd->command](pSql, NULL);
|
||||
}
|
||||
|
|
|
@ -627,7 +627,7 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
|
|||
|
||||
char *z = NULL;
|
||||
if (len > 0) {
|
||||
z = strstr(pCmd->payload, "invalid SQL");
|
||||
z = strstr(pCmd->payload, "invalid operation");
|
||||
if (z == NULL) {
|
||||
z = strstr(pCmd->payload, "syntax error");
|
||||
}
|
||||
|
|
|
@ -2578,13 +2578,14 @@ static void top_function(SQLFunctionCtx *pCtx) {
|
|||
|
||||
for (int32_t i = 0; i < pCtx->size; ++i) {
|
||||
char *data = GET_INPUT_DATA(pCtx, i);
|
||||
TSKEY ts = GET_TS_DATA(pCtx, i);
|
||||
|
||||
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
notNullElems++;
|
||||
|
||||
// NOTE: Set the default timestamp if it is missing [todo refactor]
|
||||
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
|
||||
do_top_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0);
|
||||
}
|
||||
|
||||
|
@ -2657,13 +2658,13 @@ static void bottom_function(SQLFunctionCtx *pCtx) {
|
|||
|
||||
for (int32_t i = 0; i < pCtx->size; ++i) {
|
||||
char *data = GET_INPUT_DATA(pCtx, i);
|
||||
TSKEY ts = GET_TS_DATA(pCtx, i);
|
||||
|
||||
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
notNullElems++;
|
||||
// NOTE: Set the default timestamp if it is missing [todo refactor]
|
||||
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
|
||||
do_bottom_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL, 0);
|
||||
}
|
||||
|
||||
|
|
|
@ -947,7 +947,13 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
|
|||
uint32_t status = aAggs[pCtx[i].functionId].status;
|
||||
if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) {
|
||||
SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
pCtx[i].ptsList = (int64_t*) tsInfo->pData;
|
||||
// In case of the top/bottom query again the nest query result, which has no timestamp column
|
||||
// don't set the ptsList attribute.
|
||||
if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
pCtx[i].ptsList = (int64_t*) tsInfo->pData;
|
||||
} else {
|
||||
pCtx[i].ptsList = NULL;
|
||||
}
|
||||
}
|
||||
} else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
|
||||
SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
|
||||
|
@ -4213,6 +4219,10 @@ static void updateTableIdInfo(STableQueryInfo* pTableQueryInfo, SSDataBlock* pBl
|
|||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
|
||||
pTableQueryInfo->lastKey = ((order == TSDB_ORDER_ASC)? pBlock->info.window.ekey:pBlock->info.window.skey) + step;
|
||||
|
||||
if (pTableQueryInfo->pTable == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
STableIdInfo tidInfo = createTableIdInfo(pTableQueryInfo);
|
||||
STableIdInfo *idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid));
|
||||
if (idinfo != NULL) {
|
||||
|
@ -4880,10 +4890,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
|
|||
updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows);
|
||||
|
||||
arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
|
||||
|
||||
if (pTableQueryInfo != NULL) { // TODO refactor
|
||||
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
|
||||
}
|
||||
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
|
||||
|
||||
pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
|
||||
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
|
||||
|
@ -4924,10 +4931,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
|
|||
updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows);
|
||||
|
||||
arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
|
||||
|
||||
if (pTableQueryInfo != NULL) { // TODO refactor
|
||||
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
|
||||
}
|
||||
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
|
||||
|
||||
pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
|
||||
if (pRes->info.rows >= 1000/*pRuntimeEnv->resultInfo.threshold*/) {
|
||||
|
|
Loading…
Reference in New Issue