[td-225]fix the bug found by crash_gen
This commit is contained in:
parent
b671d723b0
commit
35f074927f
|
@ -447,18 +447,22 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
|||
}
|
||||
|
||||
if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
taosReleaseRef(tscObjRef, pSql->self);
|
||||
return;
|
||||
} else {
|
||||
assert(code == TSDB_CODE_SUCCESS);
|
||||
assert(code == TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
(*pSql->fp)(pSql->param, pSql, code);
|
||||
} else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
|
||||
tscHandleMultivnodeInsert(pSql);
|
||||
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
|
||||
tscImportDataFromFile(pSql);
|
||||
} else {
|
||||
tscHandleMultivnodeInsert(pSql);
|
||||
}
|
||||
} else {
|
||||
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
|
||||
executeQuery(pSql, pQueryInfo1);
|
||||
|
|
|
@ -1527,6 +1527,7 @@ void tscImportDataFromFile(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && strlen(pCmd->payload) != 0);
|
||||
pCmd->active = pCmd->pQueryInfo[0];
|
||||
|
||||
SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
|
||||
SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);
|
||||
|
|
|
@ -3543,6 +3543,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
|||
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
|
||||
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
|
||||
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
|
||||
pQueryAttr->distinctTag = pQueryInfo->distinctTag;
|
||||
|
||||
pQueryAttr->numOfCols = numOfCols;
|
||||
pQueryAttr->numOfOutput = numOfOutput;
|
||||
|
|
|
@ -195,6 +195,7 @@ typedef struct SQueryAttr {
|
|||
bool simpleAgg;
|
||||
bool pointInterpQuery; // point interpolation query
|
||||
bool needReverseScan; // need reverse scan
|
||||
bool distinctTag; // distinct tag query
|
||||
int32_t interBufSize; // intermediate buffer sizse
|
||||
|
||||
int32_t havingNum; // having expr number
|
||||
|
@ -297,9 +298,10 @@ enum OPERATOR_TYPE_E {
|
|||
OP_MultiTableAggregate = 14,
|
||||
OP_MultiTableTimeInterval = 15,
|
||||
OP_DummyInput = 16, //TODO remove it after fully refactor.
|
||||
OP_MultiwaySort = 17, // multi-way data merge into one input stream.
|
||||
OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
|
||||
OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
|
||||
OP_Filter = 19,
|
||||
OP_Filter = 19,
|
||||
OP_Distinct = 20,
|
||||
};
|
||||
|
||||
typedef struct SOperatorInfo {
|
||||
|
@ -463,6 +465,14 @@ typedef struct SSWindowOperatorInfo {
|
|||
int32_t start; // start row index
|
||||
} SSWindowOperatorInfo;
|
||||
|
||||
typedef struct SDistinctOperatorInfo {
|
||||
SHashObj *pSet;
|
||||
SSDataBlock *pRes;
|
||||
bool recordNullVal; //has already record the null value, no need to try again
|
||||
int64_t threshold;
|
||||
int64_t outputCapacity;
|
||||
} SDistinctOperatorInfo;
|
||||
|
||||
struct SLocalMerger;
|
||||
|
||||
typedef struct SMultiwayMergeInfo {
|
||||
|
@ -498,6 +508,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
|
|||
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
|
||||
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
|
||||
int32_t numOfRows, void* merger, bool groupMix);
|
||||
|
|
|
@ -1780,7 +1780,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
break;
|
||||
}
|
||||
|
||||
case OP_MultiwaySort: {
|
||||
case OP_MultiwayMergeSort: {
|
||||
bool groupMix = true;
|
||||
if(pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
|
||||
groupMix = false;
|
||||
|
@ -1791,15 +1791,19 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
}
|
||||
|
||||
case OP_GlobalAggregate: {
|
||||
pRuntimeEnv->proot =
|
||||
createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
||||
pQueryAttr->numOfExpr3, merger);
|
||||
pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
||||
pQueryAttr->numOfExpr3, merger);
|
||||
break;
|
||||
}
|
||||
|
||||
case OP_SLimit: {
|
||||
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
|
||||
pQueryAttr->numOfExpr3, merger);
|
||||
pQueryAttr->numOfExpr3, merger);
|
||||
break;
|
||||
}
|
||||
|
||||
case OP_Distinct: {
|
||||
pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -4621,7 +4625,7 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
|
|||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
pOperator->name = "MultiwaySortOperator";
|
||||
pOperator->operatorType = OP_MultiwaySort;
|
||||
pOperator->operatorType = OP_MultiwayMergeSort;
|
||||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->info = pInfo;
|
||||
|
@ -5338,6 +5342,12 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols);
|
||||
}
|
||||
|
||||
static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*) param;
|
||||
taosHashCleanup(pInfo->pSet);
|
||||
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
|
||||
}
|
||||
|
||||
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
|
||||
|
||||
|
@ -5641,7 +5651,6 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
|
||||
static SSDataBlock* doTagScan(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -5793,6 +5802,88 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SDistinctOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pRes = pInfo->pRes;
|
||||
|
||||
pRes->info.rows = 0;
|
||||
SSDataBlock* pBlock = NULL;
|
||||
while(1) {
|
||||
pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
assert(pBlock->info.numOfCols == 1);
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
|
||||
int16_t bytes = pColInfoData->info.bytes;
|
||||
int16_t type = pColInfoData->info.type;
|
||||
|
||||
// ensure the output buffer size
|
||||
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, 0);
|
||||
if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) {
|
||||
int32_t newSize = pRes->info.rows + pBlock->info.rows;
|
||||
char* tmp = realloc(pResultColInfoData->pData, newSize * bytes);
|
||||
if (tmp == NULL) {
|
||||
return NULL;
|
||||
} else {
|
||||
pResultColInfoData->pData = tmp;
|
||||
pInfo->outputCapacity = newSize;
|
||||
}
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
char* val = ((char*)pColInfoData->pData) + bytes * i;
|
||||
if (isNull(val, type)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
void* res = taosHashGet(pInfo->pSet, val, bytes);
|
||||
if (res == NULL) {
|
||||
taosHashPut(pInfo->pSet, val, bytes, NULL, 0);
|
||||
char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows;
|
||||
memcpy(start, val, bytes);
|
||||
pRes->info.rows += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (pRes->info.rows >= pInfo->threshold) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo));
|
||||
|
||||
pInfo->outputCapacity = 4096;
|
||||
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(pExpr->base.colType), false, HASH_NO_LOCK);
|
||||
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->outputCapacity);
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
pOperator->name = "DistinctOperator";
|
||||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->operatorType = OP_Distinct;
|
||||
pOperator->upstream = upstream;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
pOperator->exec = hashDistinct;
|
||||
pOperator->cleanup = destroyDistinctOperatorInfo;
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SColumnInfo* pTagCols) {
|
||||
int32_t j = 0;
|
||||
|
||||
|
|
|
@ -49,17 +49,20 @@ SArray* createTableScanPlan(SQueryAttr* pQueryAttr) {
|
|||
int32_t op = 0;
|
||||
if (onlyQueryTags(pQueryAttr)) {
|
||||
// op = OP_TagScan;
|
||||
} else if (pQueryAttr->queryBlockDist) {
|
||||
op = OP_TableBlockInfoScan;
|
||||
} else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) {
|
||||
op = OP_TableSeqScan;
|
||||
} else if (pQueryAttr->needReverseScan) {
|
||||
op = OP_DataBlocksOptScan;
|
||||
} else {
|
||||
op = OP_TableScan;
|
||||
if (pQueryAttr->queryBlockDist) {
|
||||
op = OP_TableBlockInfoScan;
|
||||
} else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) {
|
||||
op = OP_TableSeqScan;
|
||||
} else if (pQueryAttr->needReverseScan) {
|
||||
op = OP_DataBlocksOptScan;
|
||||
} else {
|
||||
op = OP_TableScan;
|
||||
}
|
||||
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
|
||||
taosArrayPush(plan, &op);
|
||||
return plan;
|
||||
}
|
||||
|
||||
|
@ -70,6 +73,10 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
|||
if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query
|
||||
op = OP_TagScan;
|
||||
taosArrayPush(plan, &op);
|
||||
if (pQueryAttr->distinctTag) {
|
||||
op = OP_Distinct;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
} else if (pQueryAttr->interval.interval > 0) {
|
||||
if (pQueryAttr->stableQuery) {
|
||||
op = OP_MultiTableTimeInterval;
|
||||
|
@ -148,10 +155,14 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
|
|||
return plan;
|
||||
}
|
||||
|
||||
// todo:
|
||||
int32_t op = OP_MultiwaySort;
|
||||
int32_t op = OP_MultiwayMergeSort;
|
||||
taosArrayPush(plan, &op);
|
||||
|
||||
if (pQueryAttr->distinctTag) {
|
||||
op = OP_Distinct;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
|
||||
if (pQueryAttr->simpleAgg || (pQueryAttr->interval.interval > 0 || pQueryAttr->sw.gap > 0)) {
|
||||
op = OP_GlobalAggregate;
|
||||
taosArrayPush(plan, &op);
|
||||
|
|
|
@ -17,7 +17,7 @@ run general/parser/first_last.sim
|
|||
run general/parser/import_commit1.sim
|
||||
run general/parser/import_commit2.sim
|
||||
run general/parser/import_commit3.sim
|
||||
#run general/parser/import_file.sim
|
||||
run general/parser/import_file.sim
|
||||
run general/parser/insert_tb.sim
|
||||
run general/parser/tags_dynamically_specifiy.sim
|
||||
run general/parser/interp.sim
|
||||
|
@ -43,6 +43,7 @@ run general/parser/join_multivnode.sim
|
|||
run general/parser/join_manyblocks.sim
|
||||
run general/parser/projection_limit_offset.sim
|
||||
run general/parser/select_with_tags.sim
|
||||
run general/parser/select_distinct_tag.sim
|
||||
run general/parser/groupby.sim
|
||||
run general/parser/tags_filter.sim
|
||||
run general/parser/topbot.sim
|
||||
|
|
Loading…
Reference in New Issue