[TD-225]1. add sql string in qhandle, 2. enable column check during convert query message.
This commit is contained in:
parent
945e346655
commit
7d9ad2e9d4
|
@ -564,7 +564,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
/*
|
||||
* for table query, simply return the size <= 1k
|
||||
*/
|
||||
static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
|
||||
static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
|
||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
|
||||
|
||||
|
@ -574,6 +576,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
|
|||
int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs * 2);
|
||||
|
||||
int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;
|
||||
int32_t sqlLen = strlen(pSql->sqlstr) + 1;
|
||||
|
||||
int32_t tableSerialize = 0;
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
@ -590,7 +593,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) {
|
|||
}
|
||||
|
||||
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize +
|
||||
tableSerialize + 4096;
|
||||
tableSerialize + sqlLen + 4096;
|
||||
}
|
||||
|
||||
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
|
||||
|
@ -670,7 +673,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
|||
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
int32_t size = tscEstimateQueryMsgSize(pCmd, pCmd->clauseIndex);
|
||||
int32_t size = tscEstimateQueryMsgSize(pSql, pCmd->clauseIndex);
|
||||
|
||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
|
||||
tscError("%p failed to malloc for query msg", pSql);
|
||||
|
@ -703,7 +706,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
|
||||
|
||||
int32_t numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
|
||||
|
||||
int32_t sqlLen = (int32_t) strlen(pSql->sqlstr);
|
||||
|
||||
if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
|
||||
pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey);
|
||||
pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey);
|
||||
|
@ -726,10 +730,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pQueryMsg->interval.offsetUnit = pQueryInfo->interval.offsetUnit;
|
||||
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
|
||||
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
|
||||
pQueryMsg->tagCondLen = htons((pQueryInfo->tagCond.tbnameCond.cond != NULL)? strlen(pQueryInfo->tagCond.tbnameCond.cond):0);
|
||||
pQueryMsg->numOfTags = htonl(numOfTags);
|
||||
pQueryMsg->queryType = htonl(pQueryInfo->type);
|
||||
pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit);
|
||||
|
||||
pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit);
|
||||
pQueryMsg->sqlstrLen = htonl(sqlLen);
|
||||
|
||||
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
|
||||
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number
|
||||
|
||||
|
@ -964,8 +970,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
}
|
||||
|
||||
if (pQueryInfo->tagCond.tbnameCond.cond == NULL) {
|
||||
*pMsg = 0;
|
||||
pMsg++;
|
||||
assert(pQueryMsg->tagCondLen == 0);
|
||||
} else {
|
||||
strcpy(pMsg, pQueryInfo->tagCond.tbnameCond.cond);
|
||||
pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1;
|
||||
|
@ -989,6 +994,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks);
|
||||
}
|
||||
|
||||
memcpy(pMsg, pSql->sqlstr, sqlLen);
|
||||
pMsg += sqlLen;
|
||||
|
||||
int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
|
||||
|
||||
tscDebug("%p msg built success, len:%d bytes", pSql, msgLen);
|
||||
|
|
|
@ -1995,7 +1995,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
|
||||
pNew->pTscObj = pSql->pTscObj;
|
||||
pNew->signature = pNew;
|
||||
pNew->sqlstr = NULL;
|
||||
pNew->sqlstr = strdup(pSql->sqlstr);
|
||||
|
||||
SSqlCmd* pnCmd = &pNew->cmd;
|
||||
memcpy(pnCmd, pCmd, sizeof(SSqlCmd));
|
||||
|
|
|
@ -494,6 +494,7 @@ typedef struct {
|
|||
int32_t tsNumOfBlocks; // ts comp block numbers
|
||||
int32_t tsOrder; // ts comp block order
|
||||
int32_t numOfTags; // number of tags columns involved
|
||||
int32_t sqlstrLen; // sql query string
|
||||
SColumnInfo colList[];
|
||||
} SQueryTableMsg;
|
||||
|
||||
|
|
|
@ -239,6 +239,7 @@ typedef struct SQInfo {
|
|||
int32_t dataReady; // denote if query result is ready or not
|
||||
void* rspContext; // response context
|
||||
int64_t startExecTs; // start to exec timestamp
|
||||
char* sql; // query sql string
|
||||
} SQInfo;
|
||||
|
||||
#endif // TDENGINE_QUERYEXECUTOR_H
|
||||
|
|
|
@ -5911,13 +5911,13 @@ static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pE
|
|||
j += 1;
|
||||
}
|
||||
}
|
||||
assert(0);
|
||||
return -1;
|
||||
|
||||
return INT32_MIN; // return a less than TSDB_TBNAME_COLUMN_INDEX value
|
||||
}
|
||||
|
||||
bool validateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) {
|
||||
int32_t j = getColumnIndexInSource(pQueryMsg, pExprMsg, pTagCols);
|
||||
return j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags;
|
||||
return j != INT32_MIN;
|
||||
}
|
||||
|
||||
static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) {
|
||||
|
@ -5944,12 +5944,14 @@ static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pExprMsg) {
|
||||
static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols) {
|
||||
int32_t numOfTotal = pQueryMsg->numOfCols + pQueryMsg->numOfTags;
|
||||
if (pQueryMsg->numOfCols < 0 || pQueryMsg->numOfTags < 0 || numOfTotal > TSDB_MAX_COLUMNS) {
|
||||
qError("qmsg:%p illegal value of numOfCols %d numOfTags:%d", pQueryMsg, pQueryMsg->numOfCols, pQueryMsg->numOfTags);
|
||||
return false;
|
||||
} else if (numOfTotal == 0) {
|
||||
}
|
||||
|
||||
if (numOfTotal == 0) {
|
||||
for(int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
|
||||
SSqlFuncMsg* pFuncMsg = pExprMsg[i];
|
||||
|
||||
|
@ -5963,6 +5965,12 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx
|
|||
}
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
|
||||
if (!validateExprColumnInfo(pQueryMsg, pExprMsg[i], pTagCols)) {
|
||||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -5994,7 +6002,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
|
|||
* @return
|
||||
*/
|
||||
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr, SSqlFuncMsg ***pSecStageExpr,
|
||||
char **tagCond, char** tbnameCond, SColIndex **groupbyCols, SColumnInfo** tagCols) {
|
||||
char **tagCond, char** tbnameCond, SColIndex **groupbyCols, SColumnInfo** tagCols, char** sql) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (taosCheckVersion(pQueryMsg->version, version, 3) != 0) {
|
||||
|
@ -6026,7 +6034,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
|||
pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks);
|
||||
pQueryMsg->tsOrder = htonl(pQueryMsg->tsOrder);
|
||||
pQueryMsg->numOfTags = htonl(pQueryMsg->numOfTags);
|
||||
pQueryMsg->tagCondLen = htonl(pQueryMsg->tagCondLen);
|
||||
pQueryMsg->secondStageOutput = htonl(pQueryMsg->secondStageOutput);
|
||||
pQueryMsg->sqlstrLen = htonl(pQueryMsg->sqlstrLen);
|
||||
|
||||
// query msg safety check
|
||||
if (!validateQueryMsg(pQueryMsg)) {
|
||||
|
@ -6121,20 +6131,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
|||
code = TSDB_CODE_QRY_INVALID_MSG;
|
||||
goto _cleanup;
|
||||
}
|
||||
} else {
|
||||
// if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) {
|
||||
// return TSDB_CODE_QRY_INVALID_MSG;
|
||||
// }
|
||||
}
|
||||
|
||||
pExprMsg = (SSqlFuncMsg *)pMsg;
|
||||
}
|
||||
|
||||
if (!validateQuerySourceCols(pQueryMsg, *pExpr)) {
|
||||
code = TSDB_CODE_QRY_INVALID_MSG;
|
||||
goto _cleanup;
|
||||
}
|
||||
|
||||
if (pQueryMsg->secondStageOutput) {
|
||||
pExprMsg = (SSqlFuncMsg *)pMsg;
|
||||
*pSecStageExpr = calloc(pQueryMsg->secondStageOutput, POINTER_BYTES);
|
||||
|
@ -6168,10 +6169,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
|||
code = TSDB_CODE_QRY_INVALID_MSG;
|
||||
goto _cleanup;
|
||||
}
|
||||
} else {
|
||||
// if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) {
|
||||
// return TSDB_CODE_QRY_INVALID_MSG;
|
||||
// }
|
||||
}
|
||||
|
||||
pExprMsg = (SSqlFuncMsg *)pMsg;
|
||||
|
@ -6250,17 +6247,22 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
|||
pMsg += pQueryMsg->tagCondLen;
|
||||
}
|
||||
|
||||
if (*pMsg != 0) {
|
||||
size_t len = strlen(pMsg) + 1;
|
||||
|
||||
*tbnameCond = malloc(len);
|
||||
if (pQueryMsg->tagCondLen != 0) {
|
||||
*tbnameCond = calloc(1, pQueryMsg->tagCondLen + 1);
|
||||
if (*tbnameCond == NULL) {
|
||||
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _cleanup;
|
||||
}
|
||||
|
||||
strcpy(*tbnameCond, pMsg);
|
||||
pMsg += len;
|
||||
strncpy(*tbnameCond, pMsg, pQueryMsg->tagCondLen);
|
||||
pMsg += pQueryMsg->tagCondLen;
|
||||
}
|
||||
|
||||
*sql = strndup(pMsg, pQueryMsg->sqlstrLen);
|
||||
|
||||
if (!validateQuerySourceCols(pQueryMsg, *pExpr, *tagCols)) {
|
||||
code = TSDB_CODE_QRY_INVALID_MSG;
|
||||
goto _cleanup;
|
||||
}
|
||||
|
||||
qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, "
|
||||
|
@ -6269,6 +6271,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
|
|||
pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->interval.interval,
|
||||
pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset);
|
||||
|
||||
qDebug("qmsg:%p, sql:%s", pQueryMsg, *sql);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_cleanup:
|
||||
|
@ -6279,6 +6282,7 @@ _cleanup:
|
|||
tfree(*groupbyCols);
|
||||
tfree(*tagCols);
|
||||
tfree(*tagCond);
|
||||
tfree(*sql);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -6351,7 +6355,15 @@ static int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t num
|
|||
}
|
||||
} else {
|
||||
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
|
||||
assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags);
|
||||
if (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag)) {
|
||||
if (j < TSDB_TBNAME_COLUMN_INDEX || j >= pQueryMsg->numOfTags) {
|
||||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
} else {
|
||||
if (j < PRIMARYKEY_TIMESTAMP_COL_INDEX || j >= pQueryMsg->numOfCols) {
|
||||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
}
|
||||
|
||||
if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX && j >= 0) {
|
||||
SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j];
|
||||
|
@ -6375,6 +6387,7 @@ static int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t num
|
|||
if (pExprs[i].base.functionId == TSDB_FUNC_TAG_DUMMY || pExprs[i].base.functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
tagLen += pExprs[i].bytes;
|
||||
}
|
||||
|
||||
assert(isValidDataType(pExprs[i].type));
|
||||
}
|
||||
|
||||
|
@ -6577,7 +6590,7 @@ static void calResultBufSize(SQuery* pQuery) {
|
|||
}
|
||||
|
||||
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
|
||||
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery) {
|
||||
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql) {
|
||||
int16_t numOfCols = pQueryMsg->numOfCols;
|
||||
int16_t numOfOutput = pQueryMsg->numOfOutput;
|
||||
|
||||
|
@ -6708,6 +6721,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
|||
pQInfo->arrTableIdInfo = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
|
||||
pQInfo->rspContext = NULL;
|
||||
pQInfo->sql = sql;
|
||||
pthread_mutex_init(&pQInfo->lock, NULL);
|
||||
tsem_init(&pQInfo->ready, 0, 0);
|
||||
|
||||
|
@ -6951,6 +6965,8 @@ static void freeQInfo(SQInfo *pQInfo) {
|
|||
doDestroyTableQueryInfo(&pQInfo->tableqinfoGroupInfo);
|
||||
|
||||
tfree(pQInfo->pBuf);
|
||||
tfree(pQInfo->sql);
|
||||
|
||||
tsdbDestroyTableGroup(&pQInfo->tableGroupInfo);
|
||||
taosHashCleanup(pQInfo->arrTableIdInfo);
|
||||
|
||||
|
@ -7049,6 +7065,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
|
|||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
char *sql = NULL;
|
||||
char *tagCond = NULL;
|
||||
char *tbnameCond = NULL;
|
||||
SArray *pTableIdList = NULL;
|
||||
|
@ -7061,7 +7078,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
|
|||
SColumnInfo *pTagColumnInfo = NULL;
|
||||
SSqlGroupbyExpr *pGroupbyExpr = NULL;
|
||||
|
||||
code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &pSecExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo);
|
||||
code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &pSecExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo, &sql);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _over;
|
||||
}
|
||||
|
@ -7145,8 +7162,9 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
|
|||
goto _over;
|
||||
}
|
||||
|
||||
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, pSecExprs, &tableGroupInfo, pTagColumnInfo, isSTableQuery);
|
||||
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, pSecExprs, &tableGroupInfo, pTagColumnInfo, isSTableQuery, sql);
|
||||
|
||||
sql = NULL;
|
||||
pExprs = NULL;
|
||||
pSecExprs = NULL;
|
||||
pGroupbyExpr = NULL;
|
||||
|
@ -7170,6 +7188,7 @@ _over:
|
|||
}
|
||||
|
||||
free(pTagColumnInfo);
|
||||
free(sql);
|
||||
free(pExprs);
|
||||
free(pSecExprs);
|
||||
|
||||
|
|
Loading…
Reference in New Issue