[td-13039] fix bug in show db.
This commit is contained in:
parent
4e75f88f82
commit
d6af35adb8
|
@ -1346,7 +1346,7 @@ static char* getDataPosition(char* pData, SShowObj* pShow, int32_t cols, int32_t
|
||||||
return pData + pShow->offset[cols] * capacityOfRow + pShow->bytes[cols] * rows;
|
return pData + pShow->offset[cols] * capacityOfRow + pShow->bytes[cols] * rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_t rows, int32_t rowCapacity) {
|
static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_t rows, int32_t rowCapacity, int64_t numOfTables) {
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
|
||||||
char* pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
char* pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
@ -1367,7 +1367,7 @@ static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
*(int64_t *)pWrite = 0; // todo: num of Tables
|
*(int64_t *)pWrite = numOfTables;
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
@ -1447,6 +1447,18 @@ static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_
|
||||||
*(int8_t *)pWrite = pDb->cfg.update;
|
*(int8_t *)pWrite = pDb->cfg.update;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void setInformationSchemaDbCfg(SDbObj* pDbObj) {
|
||||||
|
ASSERT(pDbObj != NULL);
|
||||||
|
strncpy(pDbObj->name, TSDB_INFORMATION_SCHEMA_DB, tListLen(pDbObj->name));
|
||||||
|
|
||||||
|
pDbObj->createdTime = 0;
|
||||||
|
pDbObj->cfg.numOfVgroups = 0;
|
||||||
|
pDbObj->cfg.quorum = 1;
|
||||||
|
pDbObj->cfg.replications = 1;
|
||||||
|
pDbObj->cfg.update = 1;
|
||||||
|
pDbObj->cfg.precision = TSDB_TIME_PRECISION_MILLI;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rowsCapacity) {
|
static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rowsCapacity) {
|
||||||
SMnode *pMnode = pReq->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
@ -1459,14 +1471,18 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
dumpDbInfoToPayload(data, pDb, pShow, numOfRows, rowsCapacity);
|
dumpDbInfoToPayload(data, pDb, pShow, numOfRows, rowsCapacity, 0);
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pDb);
|
sdbRelease(pSdb, pDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Append the information_schema database into the result.
|
// Append the information_schema database into the result.
|
||||||
|
if (numOfRows < rowsCapacity) {
|
||||||
|
SDbObj dummyISDb = {0};
|
||||||
|
setInformationSchemaDbCfg(&dummyISDb);
|
||||||
|
dumpDbInfoToPayload(data, &dummyISDb, pShow, numOfRows, rowsCapacity, 14);
|
||||||
|
numOfRows += 1;
|
||||||
|
}
|
||||||
|
|
||||||
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rowsCapacity, pShow);
|
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rowsCapacity, pShow);
|
||||||
pShow->numOfReads += numOfRows;
|
pShow->numOfReads += numOfRows;
|
||||||
|
|
|
@ -355,28 +355,6 @@ typedef struct SQInfo {
|
||||||
STaskCostInfo summary;
|
STaskCostInfo summary;
|
||||||
} SQInfo;
|
} SQInfo;
|
||||||
|
|
||||||
typedef struct STaskParam {
|
|
||||||
char* sql;
|
|
||||||
char* tagCond;
|
|
||||||
char* colCond;
|
|
||||||
char* tbnameCond;
|
|
||||||
char* prevResult;
|
|
||||||
SArray* pTableIdList;
|
|
||||||
SExprBasicInfo** pExpr;
|
|
||||||
SExprBasicInfo** pSecExpr;
|
|
||||||
SExprInfo* pExprs;
|
|
||||||
SExprInfo* pSecExprs;
|
|
||||||
|
|
||||||
SFilterInfo* pFilters;
|
|
||||||
|
|
||||||
SColIndex* pGroupColIndex;
|
|
||||||
SColumnInfo* pTagColumnInfo;
|
|
||||||
SGroupbyExpr* pGroupbyExpr;
|
|
||||||
int32_t tableScanOperator;
|
|
||||||
SArray* pOperator;
|
|
||||||
struct SUdfInfo* pUdfInfo;
|
|
||||||
} STaskParam;
|
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
DATA_NOT_READY = 0x1,
|
DATA_NOT_READY = 0x1,
|
||||||
DATA_READY = 0x2,
|
DATA_READY = 0x2,
|
||||||
|
@ -444,13 +422,6 @@ typedef struct SStreamBlockScanInfo {
|
||||||
void* readerHandle; // stream block reader handle
|
void* readerHandle; // stream block reader handle
|
||||||
} SStreamBlockScanInfo;
|
} SStreamBlockScanInfo;
|
||||||
|
|
||||||
|
|
||||||
typedef struct SSysScanResInfo {
|
|
||||||
struct SSysTableScanInfo *pSysScanInfo;
|
|
||||||
SRetrieveTableRsp *pRsp;
|
|
||||||
uint64_t totalRows;
|
|
||||||
} SSysScanResInfo;
|
|
||||||
|
|
||||||
typedef struct SSysTableScanInfo {
|
typedef struct SSysTableScanInfo {
|
||||||
union {
|
union {
|
||||||
void* pTransporter;
|
void* pTransporter;
|
||||||
|
@ -458,18 +429,15 @@ typedef struct SSysTableScanInfo {
|
||||||
};
|
};
|
||||||
|
|
||||||
SRetrieveMetaTableRsp *pRsp;
|
SRetrieveMetaTableRsp *pRsp;
|
||||||
|
void *pCur; // cursor
|
||||||
void *pCur; // cursor
|
SRetrieveTableReq req;
|
||||||
SRetrieveTableReq req;
|
SEpSet epSet;
|
||||||
SEpSet epSet;
|
int32_t type; // show type
|
||||||
int32_t type; // show type
|
tsem_t ready;
|
||||||
tsem_t ready;
|
SSchema* pSchema;
|
||||||
SSchema* pSchema;
|
SSDataBlock* pRes;
|
||||||
SSDataBlock* pRes;
|
int32_t capacity;
|
||||||
|
int64_t numOfBlocks; // extract basic running information.
|
||||||
int32_t capacity;
|
|
||||||
int64_t numOfBlocks; // extract basic running information.
|
|
||||||
int64_t totalRows;
|
|
||||||
SLoadRemoteDataInfo loadInfo;
|
SLoadRemoteDataInfo loadInfo;
|
||||||
} SSysTableScanInfo;
|
} SSysTableScanInfo;
|
||||||
|
|
||||||
|
@ -690,8 +658,6 @@ SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
|
||||||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
// SSDataBlock* doSLimit(void* param, bool* newgroup);
|
// SSDataBlock* doSLimit(void* param, bool* newgroup);
|
||||||
|
|
||||||
// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
|
|
||||||
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
|
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
|
||||||
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p);
|
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p);
|
||||||
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
|
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
|
||||||
|
@ -709,9 +675,6 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
|
|
||||||
int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters);
|
int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters);
|
||||||
|
|
||||||
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
|
|
||||||
int32_t prevResultLen, void* merger);
|
|
||||||
|
|
||||||
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId);
|
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId);
|
||||||
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
|
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
|
||||||
|
|
||||||
|
@ -723,11 +686,7 @@ int32_t buildArithmeticExprFromMsg(SExprInfo* pArithExprInfo, void* pQueryMsg);
|
||||||
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
|
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
|
||||||
int32_t checkForQueryBuf(size_t numOfTables);
|
int32_t checkForQueryBuf(size_t numOfTables);
|
||||||
bool checkNeedToCompressQueryCol(SQInfo* pQInfo);
|
bool checkNeedToCompressQueryCol(SQInfo* pQInfo);
|
||||||
void setQueryStatus(STaskRuntimeEnv* pRuntimeEnv, int8_t status);
|
|
||||||
|
|
||||||
int32_t doDumpQueryResult(SQInfo* pQInfo, char* data, int8_t compressed, int32_t* compLen);
|
|
||||||
|
|
||||||
size_t getResultSize(SQInfo* pQInfo, int64_t* numOfRows);
|
|
||||||
void setTaskKilled(SExecTaskInfo* pTaskInfo);
|
void setTaskKilled(SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
|
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
|
||||||
|
@ -737,8 +696,6 @@ void calculateOperatorProfResults(SQInfo* pQInfo);
|
||||||
void queryCostStatis(SExecTaskInfo* pTaskInfo);
|
void queryCostStatis(SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
void doDestroyTask(SExecTaskInfo* pTaskInfo);
|
void doDestroyTask(SExecTaskInfo* pTaskInfo);
|
||||||
void freeQueryAttr(STaskAttr* pQuery);
|
|
||||||
|
|
||||||
int32_t getMaximumIdleDurationSec();
|
int32_t getMaximumIdleDurationSec();
|
||||||
|
|
||||||
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type);
|
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type);
|
||||||
|
|
|
@ -4965,7 +4965,6 @@ static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo*
|
||||||
|
|
||||||
size_t len = numOfRows * pColInfoData->info.bytes;
|
size_t len = numOfRows * pColInfoData->info.bytes;
|
||||||
memcpy(tmp, pData, len);
|
memcpy(tmp, pData, len);
|
||||||
|
|
||||||
pColInfoData->pData = tmp;
|
pColInfoData->pData = tmp;
|
||||||
pData += len;
|
pData += len;
|
||||||
}
|
}
|
||||||
|
@ -4982,7 +4981,6 @@ static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
pLoadInfo->totalElapsed += el;
|
pLoadInfo->totalElapsed += el;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5452,7 +5450,7 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->totalRows += numOfRows;
|
pInfo->loadInfo.totalRows += numOfRows;
|
||||||
pInfo->pRes->info.rows = numOfRows;
|
pInfo->pRes->info.rows = numOfRows;
|
||||||
|
|
||||||
// pInfo->elapsedTime;
|
// pInfo->elapsedTime;
|
||||||
|
@ -5482,7 +5480,6 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
|
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
|
||||||
|
|
||||||
tsem_wait(&pInfo->ready);
|
tsem_wait(&pInfo->ready);
|
||||||
|
|
||||||
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
|
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
|
||||||
|
@ -5495,9 +5492,8 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pRes = pInfo->pRes;
|
|
||||||
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
|
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
|
||||||
setSDataBlockFromFetchRsp(pRes, &pInfo->loadInfo, pTableRsp->numOfRows,
|
setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows,
|
||||||
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL);
|
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL);
|
||||||
|
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
|
@ -5565,64 +5561,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) {
|
|
||||||
assert(pTableScanInfo != NULL && pDownstream != NULL);
|
|
||||||
|
|
||||||
pTableScanInfo->pExpr = pDownstream->pExpr; // TODO refactor to use colId instead of pExpr
|
|
||||||
pTableScanInfo->numOfOutput = pDownstream->numOfOutput;
|
|
||||||
#if 0
|
|
||||||
if (pDownstream->operatorType == OP_Aggregate || pDownstream->operatorType == OP_MultiTableAggregate) {
|
|
||||||
SAggOperatorInfo* pAggInfo = pDownstream->info;
|
|
||||||
|
|
||||||
pTableScanInfo->pCtx = pAggInfo->binfo.pCtx;
|
|
||||||
pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
|
|
||||||
pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
|
|
||||||
} else if (pDownstream->operatorType == OP_TimeWindow || pDownstream->operatorType == OP_AllTimeWindow) {
|
|
||||||
STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info;
|
|
||||||
|
|
||||||
pTableScanInfo->pCtx = pIntervalInfo->pCtx;
|
|
||||||
pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo;
|
|
||||||
pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset;
|
|
||||||
|
|
||||||
} else if (pDownstream->operatorType == OP_Groupby) {
|
|
||||||
SGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info;
|
|
||||||
|
|
||||||
pTableScanInfo->pCtx = pGroupbyInfo->binfo.pCtx;
|
|
||||||
pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo;
|
|
||||||
pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset;
|
|
||||||
|
|
||||||
} else if (pDownstream->operatorType == OP_MultiTableTimeInterval || pDownstream->operatorType == OP_AllMultiTableTimeInterval) {
|
|
||||||
STableIntervalOperatorInfo *pInfo = pDownstream->info;
|
|
||||||
|
|
||||||
pTableScanInfo->pCtx = pInfo->pCtx;
|
|
||||||
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
|
|
||||||
pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset;
|
|
||||||
|
|
||||||
} else if (pDownstream->operatorType == OP_Project) {
|
|
||||||
SProjectOperatorInfo *pInfo = pDownstream->info;
|
|
||||||
|
|
||||||
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
|
|
||||||
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
|
|
||||||
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
|
|
||||||
} else if (pDownstream->operatorType == OP_SessionWindow) {
|
|
||||||
SSWindowOperatorInfo* pInfo = pDownstream->info;
|
|
||||||
|
|
||||||
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
|
|
||||||
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
|
|
||||||
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
|
|
||||||
} else if (pDownstream->operatorType == OP_StateWindow) {
|
|
||||||
SStateWindowOperatorInfo* pInfo = pDownstream->info;
|
|
||||||
|
|
||||||
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
|
|
||||||
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
|
|
||||||
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
|
|
||||||
} else {
|
|
||||||
assert(0);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* getOrderCheckColumns(STaskAttr* pQuery) {
|
SArray* getOrderCheckColumns(STaskAttr* pQuery) {
|
||||||
int32_t numOfCols = (pQuery->pGroupbyExpr == NULL)? 0: taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo);
|
int32_t numOfCols = (pQuery->pGroupbyExpr == NULL)? 0: taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo);
|
||||||
|
|
||||||
|
@ -7995,37 +7933,6 @@ bool validateExprColumnInfo(SQueriedTableInfo *pTableInfo, SExprBasicInfo *pExpr
|
||||||
return j != INT32_MIN;
|
return j != INT32_MIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SExprBasicInfo** pExpr, int32_t numOfOutput,
|
|
||||||
SColumnInfo* pTagCols, void* pMsg) {
|
|
||||||
int32_t numOfTotal = pTableInfo->numOfCols + pTableInfo->numOfTags;
|
|
||||||
if (pTableInfo->numOfCols < 0 || pTableInfo->numOfTags < 0 || numOfTotal > TSDB_MAX_COLUMNS) {
|
|
||||||
//qError("qmsg:%p illegal value of numOfCols %d numOfTags:%d", pMsg, pTableInfo->numOfCols, pTableInfo->numOfTags);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numOfTotal == 0) { // table total columns are not required.
|
|
||||||
// for(int32_t i = 0; i < numOfOutput; ++i) {
|
|
||||||
// SExprBasicInfo* p = pExpr[i];
|
|
||||||
// if ((p->functionId == FUNCTION_TAGPRJ) ||
|
|
||||||
// (p->functionId == FUNCTION_TID_TAG && p->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) ||
|
|
||||||
// (p->functionId == FUNCTION_COUNT && p->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) ||
|
|
||||||
// (p->functionId == FUNCTION_BLKINFO)) {
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return false;
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
|
||||||
if (!validateExprColumnInfo(pTableInfo, pExpr[i], pTagCols)) {
|
|
||||||
return TSDB_CODE_QRY_INVALID_MSG;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t numOfFilters, char** pMsg) {
|
static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t numOfFilters, char** pMsg) {
|
||||||
for (int32_t f = 0; f < numOfFilters; ++f) {
|
for (int32_t f = 0; f < numOfFilters; ++f) {
|
||||||
SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)(*pMsg);
|
SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)(*pMsg);
|
||||||
|
@ -8060,10 +7967,10 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t
|
||||||
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, const char* name) {
|
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, const char* name) {
|
||||||
SResSchema s = {0};
|
SResSchema s = {0};
|
||||||
s.scale = scale;
|
s.scale = scale;
|
||||||
s.precision = precision;
|
|
||||||
s.type = type;
|
s.type = type;
|
||||||
s.bytes = bytes;
|
s.bytes = bytes;
|
||||||
s.colId = slotId;
|
s.colId = slotId;
|
||||||
|
s.precision = precision;
|
||||||
strncpy(s.name, name, tListLen(s.name));
|
strncpy(s.name, name, tListLen(s.name));
|
||||||
|
|
||||||
return s;
|
return s;
|
||||||
|
@ -8179,7 +8086,6 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
|
||||||
SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols);
|
SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols);
|
||||||
|
|
||||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, colList, tableIdList, pTaskInfo);
|
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, colList, tableIdList, pTaskInfo);
|
||||||
|
|
||||||
taosArrayDestroy(tableIdList);
|
taosArrayDestroy(tableIdList);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) {
|
||||||
|
@ -8573,75 +8479,6 @@ void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) {
|
||||||
pResultInfo->total = 0;
|
pResultInfo->total = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
|
|
||||||
return ((SQInfo *)qHandle)->qId == qId;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
|
|
||||||
int32_t prevResultLen, void* merger) {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
|
||||||
pRuntimeEnv->qinfo = pQInfo;
|
|
||||||
|
|
||||||
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
|
||||||
|
|
||||||
STSBuf *pTsBuf = NULL;
|
|
||||||
|
|
||||||
if (pTsBufInfo->tsLen > 0) { // open new file to save the result
|
|
||||||
char* tsBlock = start + pTsBufInfo->tsOffset;
|
|
||||||
pTsBuf = tsBufCreateFromCompBlocks(tsBlock, pTsBufInfo->tsNumOfBlocks, pTsBufInfo->tsLen, pTsBufInfo->tsOrder,
|
|
||||||
pQueryAttr->vgId);
|
|
||||||
|
|
||||||
if (pTsBuf == NULL) {
|
|
||||||
code = TSDB_CODE_QRY_NO_DISKSPACE;
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
tsBufResetPos(pTsBuf);
|
|
||||||
bool ret = tsBufNextPos(pTsBuf);
|
|
||||||
UNUSED(ret);
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* prevResult = NULL;
|
|
||||||
if (prevResultLen > 0) {
|
|
||||||
prevResult = interResFromBinary(param->prevResult, prevResultLen);
|
|
||||||
pRuntimeEnv->prevResult = prevResult;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRuntimeEnv->currentOffset = pQueryAttr->limit.offset;
|
|
||||||
if (tsdb != NULL) {
|
|
||||||
// pQueryAttr->precision = tsdbGetCfg(tsdb)->precision;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.skey > pQueryAttr->window.ekey)) ||
|
|
||||||
(!QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.ekey > pQueryAttr->window.skey))) {
|
|
||||||
//qDebug("QInfo:0x%"PRIx64" no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo->qId, pQueryAttr->window.skey,
|
|
||||||
// pQueryAttr->window.ekey, pQueryAttr->order.order);
|
|
||||||
// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
|
|
||||||
pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0;
|
|
||||||
// todo free memory
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) {
|
|
||||||
//qDebug("QInfo:0x%"PRIx64" no table qualified for tag filter, abort query", pQInfo->qId);
|
|
||||||
// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// filter the qualified
|
|
||||||
if ((code = doInitQInfo(pQInfo, pTsBuf, tsdb, sourceOptr, param->tableScanOperator, param->pOperator, merger)) != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
|
|
||||||
_error:
|
|
||||||
// table query ref will be decrease during error handling
|
|
||||||
// doDestroyTask(pQInfo);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
//TODO refactor
|
//TODO refactor
|
||||||
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) {
|
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) {
|
||||||
if (pFilter == NULL || numOfFilters == 0) {
|
if (pFilter == NULL || numOfFilters == 0) {
|
||||||
|
|
|
@ -56,7 +56,7 @@ protected:
|
||||||
const string syntaxTreeStr = toString(query_->pRoot, false);
|
const string syntaxTreeStr = toString(query_->pRoot, false);
|
||||||
|
|
||||||
SLogicNode* pLogicPlan = nullptr;
|
SLogicNode* pLogicPlan = nullptr;
|
||||||
SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot };
|
SPlanContext cxt = { .queryId = 1, .acctId = 0, .mgmtEpSet = {0}, .pAstRoot = query_->pRoot };
|
||||||
code = createLogicPlan(&cxt, &pLogicPlan);
|
code = createLogicPlan(&cxt, &pLogicPlan);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl;
|
cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl;
|
||||||
|
|
Loading…
Reference in New Issue