[td-255] fix bug found by regression test.
This commit is contained in:
parent
86b00102d8
commit
e02653fb71
|
@ -7493,6 +7493,13 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set order by info
|
||||||
|
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
|
||||||
|
if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMeta)) !=
|
||||||
|
TSDB_CODE_SUCCESS) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
pQueryInfo->command = TSDB_SQL_SELECT;
|
pQueryInfo->command = TSDB_SQL_SELECT;
|
||||||
|
|
||||||
|
@ -7642,8 +7649,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
||||||
|
|
||||||
SExprInfo** p = NULL;
|
SExprInfo** p = NULL;
|
||||||
int32_t numOfExpr = 0;
|
int32_t numOfExpr = 0;
|
||||||
|
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
code = createProjectionExpr(pQueryInfo, pTableMetaInfo, &p, &numOfExpr);
|
code = createProjectionExpr(pQueryInfo, pTableMetaInfo, &p, &numOfExpr);
|
||||||
|
|
||||||
if (pQueryInfo->exprList1 == NULL) {
|
if (pQueryInfo->exprList1 == NULL) {
|
||||||
pQueryInfo->exprList1 = taosArrayInit(4, POINTER_BYTES);
|
pQueryInfo->exprList1 = taosArrayInit(4, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
|
@ -777,7 +777,9 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
|
||||||
SSqlRes* pRes = &pSql->res;
|
SSqlRes* pRes = &pSql->res;
|
||||||
|
|
||||||
SSDataBlock* pBlock = pInput->block;
|
SSDataBlock* pBlock = pInput->block;
|
||||||
pOperator->pRuntimeEnv->current = pInput->pTableQueryInfo;
|
if (pOperator->pRuntimeEnv != NULL) {
|
||||||
|
pOperator->pRuntimeEnv->current = pInput->pTableQueryInfo;
|
||||||
|
}
|
||||||
|
|
||||||
pBlock->info.rows = pRes->numOfRows;
|
pBlock->info.rows = pRes->numOfRows;
|
||||||
if (pRes->numOfRows != 0) {
|
if (pRes->numOfRows != 0) {
|
||||||
|
@ -801,6 +803,24 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) {
|
||||||
|
SJoinStatus* pStatus = &pJoinInfo->status[i];
|
||||||
|
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
|
||||||
|
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
|
||||||
|
pStatus->index = 0;
|
||||||
|
|
||||||
|
if (pStatus->pBlock == NULL) {
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
pJoinInfo->resultInfo.total += pJoinInfo->pRes->info.rows;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) {
|
SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) {
|
||||||
SOperatorInfo *pOperator = (SOperatorInfo*) param;
|
SOperatorInfo *pOperator = (SOperatorInfo*) param;
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
@ -813,19 +833,9 @@ SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) {
|
||||||
pJoinInfo->pRes->info.rows = 0;
|
pJoinInfo->pRes->info.rows = 0;
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) {
|
fetchNextBlockIfCompleted(pOperator, newgroup);
|
||||||
SJoinStatus* pStatus = &pJoinInfo->status[i];
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
|
return pJoinInfo->pRes;
|
||||||
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
|
|
||||||
pStatus->index = 0;
|
|
||||||
|
|
||||||
if (pStatus->pBlock == NULL) {
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
|
||||||
|
|
||||||
pJoinInfo->resultInfo.total += pJoinInfo->pRes->info.rows;
|
|
||||||
return pJoinInfo->pRes;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SJoinStatus* st0 = &pJoinInfo->status[0];
|
SJoinStatus* st0 = &pJoinInfo->status[0];
|
||||||
|
@ -844,8 +854,12 @@ SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) {
|
||||||
|
|
||||||
if (ts[st->index] < ts0[st0->index]) { // less than the first
|
if (ts[st->index] < ts0[st0->index]) { // less than the first
|
||||||
prefixEqual = false;
|
prefixEqual = false;
|
||||||
|
|
||||||
if ((++(st->index)) >= st->pBlock->info.rows) {
|
if ((++(st->index)) >= st->pBlock->info.rows) {
|
||||||
break;
|
fetchNextBlockIfCompleted(pOperator, newgroup);
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return pJoinInfo->pRes;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (ts[st->index] > ts0[st0->index]) { // greater than the first;
|
} else if (ts[st->index] > ts0[st0->index]) { // greater than the first;
|
||||||
if (prefixEqual == true) {
|
if (prefixEqual == true) {
|
||||||
|
@ -853,12 +867,19 @@ SSDataBlock* doDataBlockJoin(void* param, bool* newgroup) {
|
||||||
for (int32_t j = 0; j < i; ++j) {
|
for (int32_t j = 0; j < i; ++j) {
|
||||||
SJoinStatus* stx = &pJoinInfo->status[j];
|
SJoinStatus* stx = &pJoinInfo->status[j];
|
||||||
if ((++(stx->index)) >= stx->pBlock->info.rows) {
|
if ((++(stx->index)) >= stx->pBlock->info.rows) {
|
||||||
break;
|
|
||||||
|
fetchNextBlockIfCompleted(pOperator, newgroup);
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return pJoinInfo->pRes;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if ((++(st0->index)) >= st0->pBlock->info.rows) {
|
if ((++(st0->index)) >= st0->pBlock->info.rows) {
|
||||||
break;
|
fetchNextBlockIfCompleted(pOperator, newgroup);
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return pJoinInfo->pRes;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1129,6 +1150,19 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
|
||||||
memcpy(schema, pSchema, numOfCol1*sizeof(SSchema));
|
memcpy(schema, pSchema, numOfCol1*sizeof(SSchema));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update the exprinfo
|
||||||
|
int32_t numOfOutput = (int32_t)tscNumOfExprs(px);
|
||||||
|
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
SExprInfo* pex = taosArrayGetP(px->exprList, i);
|
||||||
|
int32_t colId = pex->base.colInfo.colId;
|
||||||
|
for(int32_t j = 0; j < pSourceOperator->numOfOutput; ++j) {
|
||||||
|
if (colId == schema[j].colId) {
|
||||||
|
pex->base.colInfo.colIndex = j;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
|
px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
|
||||||
tfree(pColumnInfo);
|
tfree(pColumnInfo);
|
||||||
tfree(schema);
|
tfree(schema);
|
||||||
|
|
|
@ -2489,7 +2489,6 @@ static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) {
|
||||||
tmp += POINTER_BYTES * pCtx->param[0].i64;
|
tmp += POINTER_BYTES * pCtx->param[0].i64;
|
||||||
|
|
||||||
size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen;
|
size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen;
|
||||||
// assert(pCtx->param[0].i64 > 0);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pCtx->param[0].i64; ++i) {
|
for (int32_t i = 0; i < pCtx->param[0].i64; ++i) {
|
||||||
pTopBotInfo->res[i] = (tValuePair*) tmp;
|
pTopBotInfo->res[i] = (tValuePair*) tmp;
|
||||||
|
@ -2498,7 +2497,6 @@ static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const char *maxval) {
|
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const char *maxval) {
|
||||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||||
if (pResInfo == NULL) {
|
if (pResInfo == NULL) {
|
||||||
|
@ -2742,7 +2740,7 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
|
||||||
if (pCtx->param[1].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
if (pCtx->param[1].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||||
__compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn;
|
__compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn;
|
||||||
qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
|
qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
|
||||||
} else if (pCtx->param[1].i64 > PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
} else /*if (pCtx->param[1].i64 > PRIMARYKEY_TIMESTAMP_COL_INDEX)*/ {
|
||||||
__compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn;
|
__compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn;
|
||||||
qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
|
qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue