[td-2819]
This commit is contained in:
parent
b6aa9da938
commit
8d39944396
|
@ -299,6 +299,7 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
|
|||
uint32_t tscGetTableMetaMaxSize();
|
||||
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name);
|
||||
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
|
||||
SQuery* tscCreateQueryFromQueryNodeInfo(SQueryNodeInfo* pQueryNodeInfo);
|
||||
|
||||
void tsCreateSQLFunctionCtx(SQueryNodeInfo* pQueryInfo, SQLFunctionCtx* pCtx);
|
||||
void* createQueryInfoFromQueryNode(SQueryNodeInfo* pQueryNodeInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo,
|
||||
|
|
|
@ -214,8 +214,9 @@ typedef struct SQueryNodeInfo {
|
|||
int32_t bufLen;
|
||||
char* buf;
|
||||
|
||||
SArray* pDSOperator;
|
||||
SArray* pPhyOperator;
|
||||
SArray* pDSOperator; // data source operator
|
||||
SArray* pPhyOperator; // physical query execution plan
|
||||
SQuery* pQuery; // query object
|
||||
|
||||
struct SQueryNodeInfo *sibling; // sibling
|
||||
SArray *pUpstream; // SArray<struct SQueryNodeInfo>
|
||||
|
|
|
@ -126,7 +126,7 @@ static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSql
|
|||
static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo);
|
||||
static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo);
|
||||
static int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index);
|
||||
static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryNodeInfo* pQueryInfo, SArray* pCols, int64_t *uid);
|
||||
static int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryNodeInfo* pQueryInfo, SArray* pCols, uint64_t *uid);
|
||||
static bool validateDebugFlag(int32_t v);
|
||||
static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryNodeInfo* pQueryInfo);
|
||||
|
||||
|
@ -709,7 +709,7 @@ static bool isTopBottomQuery(SQueryNodeInfo* pQueryInfo) {
|
|||
|
||||
// need to add timestamp column in result set, if it is a time window query
|
||||
static int32_t addPrimaryTsColumnForTimeWindowQuery(SQueryNodeInfo* pQueryInfo) {
|
||||
uint64_t uid = tscSqlExprGet(pQueryInfo, 0)->uid;
|
||||
uint64_t uid = tscSqlExprGet(pQueryInfo, 0)->base.uid;
|
||||
|
||||
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
|
||||
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
|
@ -1499,7 +1499,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
|
|||
pArithExprInfo->base.numOfParams = 1;
|
||||
pArithExprInfo->base.resColId = getNewResColId(pQueryInfo);
|
||||
|
||||
int32_t ret = exprTreeFromSqlExpr(pCmd, &pArithExprInfo->pExpr, pItem->pNode, pQueryInfo, NULL, &pArithExprInfo->uid);
|
||||
int32_t ret = exprTreeFromSqlExpr(pCmd, &pArithExprInfo->pExpr, pItem->pNode, pQueryInfo, NULL, &(pArithExprInfo->base.uid));
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
tExprTreeDestroy(pArithExprInfo->pExpr, NULL);
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "invalid expression in select clause");
|
||||
|
@ -3476,8 +3476,8 @@ static int32_t validateSQLExpr(SSqlCmd* pCmd, tSqlExpr* pExpr, SQueryNodeInfo* p
|
|||
}
|
||||
|
||||
if (i == 0) {
|
||||
id = p1->uid;
|
||||
} else if (id != p1->uid) {
|
||||
id = p1->base.uid;
|
||||
} else if (id != p1->base.uid) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
}
|
||||
|
@ -6126,7 +6126,8 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex) {
|
|||
char tmpBuf[1024] = {0};
|
||||
int32_t tmpLen = 0;
|
||||
tmpLen =
|
||||
sprintf(tmpBuf, "%s(uid:%" PRId64 ", %d)", aAggs[pExpr->base.functionId].name, pExpr->uid, pExpr->base.colInfo.colId);
|
||||
sprintf(tmpBuf, "%s(uid:%" PRIu64 ", %d)", aAggs[pExpr->base.functionId].name, pExpr->base.uid,
|
||||
pExpr->base.colInfo.colId);
|
||||
|
||||
if (tmpLen + offset >= totalBufSize - 1) break;
|
||||
|
||||
|
@ -6842,7 +6843,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind
|
|||
return TSDB_CODE_SUCCESS; // Does not build query message here
|
||||
}
|
||||
|
||||
int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryNodeInfo* pQueryInfo, SArray* pCols, int64_t *uid) {
|
||||
int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryNodeInfo* pQueryInfo, SArray* pCols, uint64_t *uid) {
|
||||
tExprNode* pLeft = NULL;
|
||||
tExprNode* pRight= NULL;
|
||||
|
||||
|
@ -6892,7 +6893,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pS
|
|||
(*pExpr)->pSchema->colId = p1->base.resColId;
|
||||
|
||||
if (uid != NULL) {
|
||||
*uid = p1->uid;
|
||||
*uid = p1->base.uid;
|
||||
}
|
||||
|
||||
break;
|
||||
|
|
|
@ -705,9 +705,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
}
|
||||
|
||||
SQueryNodeInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
|
||||
SQuery* pQuery = tscCreateQueryFromQueryNodeInfo(pQueryInfo);
|
||||
UNUSED(pQuery);
|
||||
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
|
||||
/*
|
||||
size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
|
||||
if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo) && !tscQueryBlockInfo(pQueryInfo)) {
|
||||
tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
|
||||
|
@ -725,6 +728,311 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
tscError("%p illegal value of numOfGroupCols in query msg: %d", pSql, pQueryInfo->groupbyExpr.numOfGroupCols);
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
*/
|
||||
|
||||
{
|
||||
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
|
||||
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
|
||||
|
||||
int32_t numOfTags = pQuery->numOfTags;
|
||||
int32_t sqlLen = (int32_t) strlen(pSql->sqlstr);
|
||||
|
||||
if (pQuery->order.order == TSDB_ORDER_ASC) {
|
||||
pQueryMsg->window.skey = htobe64(pQuery->window.skey);
|
||||
pQueryMsg->window.ekey = htobe64(pQuery->window.ekey);
|
||||
} else {
|
||||
pQueryMsg->window.skey = htobe64(pQuery->window.ekey);
|
||||
pQueryMsg->window.ekey = htobe64(pQuery->window.skey);
|
||||
}
|
||||
|
||||
pQueryMsg->order = htons(pQuery->order.order);
|
||||
pQueryMsg->orderColId = htons(pQuery->order.orderColId);
|
||||
pQueryMsg->fillType = htons(pQuery->fillType);
|
||||
pQueryMsg->limit = htobe64(pQuery->limit.limit);
|
||||
pQueryMsg->offset = htobe64(pQuery->limit.offset);
|
||||
|
||||
pQueryMsg->numOfCols = htons(pQuery->numOfCols);
|
||||
|
||||
pQueryMsg->interval.interval = htobe64(pQuery->interval.interval);
|
||||
pQueryMsg->interval.sliding = htobe64(pQuery->interval.sliding);
|
||||
pQueryMsg->interval.offset = htobe64(pQuery->interval.offset);
|
||||
pQueryMsg->interval.intervalUnit = pQuery->interval.intervalUnit;
|
||||
pQueryMsg->interval.slidingUnit = pQuery->interval.slidingUnit;
|
||||
pQueryMsg->interval.offsetUnit = pQuery->interval.offsetUnit;
|
||||
|
||||
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
|
||||
pQueryMsg->tagNameRelType = htons(pQueryInfo->tagCond.relType);
|
||||
pQueryMsg->tbnameCondLen = htonl(pQueryInfo->tagCond.tbnameCond.len);
|
||||
pQueryMsg->numOfTags = htonl(numOfTags);
|
||||
pQueryMsg->queryType = htonl(pQueryInfo->type);
|
||||
// pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit);
|
||||
pQueryMsg->sqlstrLen = htonl(sqlLen);
|
||||
pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen);
|
||||
pQueryMsg->sw.gap = htobe64(pQuery->sw.gap);
|
||||
pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX);
|
||||
|
||||
pQueryMsg->numOfOutput = htons((int16_t)pQuery->numOfOutput); // this is the stage one output column number
|
||||
|
||||
// set column list ids
|
||||
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
|
||||
char *pMsg = (char *)(pQueryMsg->colList) + numOfCols * sizeof(SColumnInfo);
|
||||
SSchema *pSchema = tscGetTableSchema(pTableMeta);
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfo *pCol = &pQuery->colList[i];
|
||||
|
||||
pQueryMsg->colList[i].colId = htons(pCol->colId);
|
||||
pQueryMsg->colList[i].bytes = htons(pCol->bytes);
|
||||
pQueryMsg->colList[i].type = htons(pCol->type);
|
||||
pQueryMsg->colList[i].numOfFilters = htons(pCol->numOfFilters);
|
||||
|
||||
// append the filter information after the basic column information
|
||||
for (int32_t f = 0; f < pCol->numOfFilters; ++f) {
|
||||
SColumnFilterInfo *pColFilter = &pCol->filterInfo[f];
|
||||
|
||||
SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
|
||||
pFilterMsg->filterstr = htons(pColFilter->filterstr);
|
||||
|
||||
pMsg += sizeof(SColumnFilterInfo);
|
||||
|
||||
if (pColFilter->filterstr) {
|
||||
pFilterMsg->len = htobe64(pColFilter->len);
|
||||
memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
|
||||
pMsg += (pColFilter->len + 1); // append the additional filter binary info
|
||||
} else {
|
||||
pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
|
||||
pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
|
||||
}
|
||||
|
||||
pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
|
||||
pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
|
||||
|
||||
if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
|
||||
tscError("invalid filter info");
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SSqlExpr *pSqlExpr = (SSqlExpr *)pMsg;
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
SSqlExpr *pExpr = &pQuery->pExpr1[i].base;
|
||||
|
||||
// the queried table has been removed and a new table with the same name has already been created already
|
||||
// return error msg
|
||||
if (pExpr->uid != pTableMeta->id.uid) {
|
||||
tscError("%p table has already been destroyed", pSql);
|
||||
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
|
||||
}
|
||||
|
||||
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
|
||||
tscError("%p table schema is not matched with parsed sql", pSql);
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
assert(pExpr->resColId < 0);
|
||||
|
||||
pSqlExpr->colInfo.colId = htons(pExpr->colInfo.colId);
|
||||
pSqlExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
|
||||
pSqlExpr->colInfo.flag = htons(pExpr->colInfo.flag);
|
||||
|
||||
pSqlExpr->colType = htons(pExpr->colType);
|
||||
pSqlExpr->colBytes = htons(pExpr->colBytes);
|
||||
pSqlExpr->resType = htons(pExpr->resType);
|
||||
pSqlExpr->resBytes = htons(pExpr->resBytes);
|
||||
|
||||
pSqlExpr->functionId = htons(pExpr->functionId);
|
||||
pSqlExpr->numOfParams = htons(pExpr->numOfParams);
|
||||
pSqlExpr->resColId = htons(pExpr->resColId);
|
||||
pMsg += sizeof(SSqlExpr);
|
||||
|
||||
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
|
||||
pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType);
|
||||
pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen);
|
||||
|
||||
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
|
||||
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
|
||||
pMsg += pExpr->param[j].nLen;
|
||||
} else {
|
||||
pSqlExpr->param[j].i64 = htobe64(pExpr->param[j].i64);
|
||||
}
|
||||
}
|
||||
|
||||
pSqlExpr = (SSqlExpr *)pMsg;
|
||||
}
|
||||
|
||||
if (pQuery->numOfExpr2 > 0) {
|
||||
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
|
||||
SSqlExpr *pExpr = &pQuery->pExpr2[i].base;
|
||||
|
||||
// the queried table has been removed and a new table with the same name has already been created already
|
||||
// return error msg
|
||||
if (pExpr->uid != pTableMeta->id.uid) {
|
||||
tscError("%p table has already been destroyed", pSql);
|
||||
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
|
||||
}
|
||||
|
||||
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
|
||||
tscError("%p table schema is not matched with parsed sql", pSql);
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
assert(pExpr->resColId < 0);
|
||||
|
||||
pSqlExpr->colInfo.colId = htons(pExpr->colInfo.colId);
|
||||
pSqlExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
|
||||
pSqlExpr->colInfo.flag = htons(pExpr->colInfo.flag);
|
||||
|
||||
pSqlExpr->colType = htons(pExpr->colType);
|
||||
pSqlExpr->colBytes = htons(pExpr->colBytes);
|
||||
pSqlExpr->resType = htons(pExpr->resType);
|
||||
pSqlExpr->resBytes = htons(pExpr->resBytes);
|
||||
|
||||
pSqlExpr->functionId = htons(pExpr->functionId);
|
||||
pSqlExpr->numOfParams = htons(pExpr->numOfParams);
|
||||
pSqlExpr->resColId = htons(pExpr->resColId);
|
||||
pMsg += sizeof(SSqlExpr);
|
||||
|
||||
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
|
||||
pSqlExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType);
|
||||
pSqlExpr->param[j].nLen = htons(pExpr->param[j].nLen);
|
||||
|
||||
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
|
||||
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
|
||||
pMsg += pExpr->param[j].nLen;
|
||||
} else {
|
||||
pSqlExpr->param[j].i64 = htobe64(pExpr->param[j].i64);
|
||||
}
|
||||
}
|
||||
|
||||
pSqlExpr = (SSqlExpr *)pMsg;
|
||||
}
|
||||
} else {
|
||||
pQueryMsg->secondStageOutput = 0;
|
||||
}
|
||||
|
||||
// serialize the table info (sid, uid, tags)
|
||||
pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg);
|
||||
|
||||
SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr;
|
||||
if (pGroupbyExpr != NULL && pGroupbyExpr->numOfGroupCols > 0) {
|
||||
pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
|
||||
pQueryMsg->orderType = htons(pGroupbyExpr->orderType);
|
||||
|
||||
for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) {
|
||||
SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j);
|
||||
|
||||
*((int16_t *)pMsg) = htons(pCol->colId);
|
||||
pMsg += sizeof(pCol->colId);
|
||||
|
||||
*((int16_t *)pMsg) += htons(pCol->colIndex);
|
||||
pMsg += sizeof(pCol->colIndex);
|
||||
|
||||
*((int16_t *)pMsg) += htons(pCol->flag);
|
||||
pMsg += sizeof(pCol->flag);
|
||||
|
||||
memcpy(pMsg, pCol->name, tListLen(pCol->name));
|
||||
pMsg += tListLen(pCol->name);
|
||||
}
|
||||
}
|
||||
|
||||
if (pQuery->fillType != TSDB_FILL_NONE) {
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
*((int64_t *)pMsg) = htobe64(pQuery->fillVal[i]);
|
||||
pMsg += sizeof(pQuery->fillVal[0]);
|
||||
}
|
||||
}
|
||||
|
||||
if (numOfTags != 0) {
|
||||
int32_t numOfColumns = tscGetNumOfColumns(pTableMeta);
|
||||
int32_t numOfTagColumns = tscGetNumOfTags(pTableMeta);
|
||||
int32_t total = numOfTagColumns + numOfColumns;
|
||||
|
||||
pSchema = tscGetTableTagSchema(pTableMeta);
|
||||
|
||||
for (int32_t i = 0; i < numOfTags; ++i) {
|
||||
SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, i);
|
||||
SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex];
|
||||
|
||||
if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < -1) ||
|
||||
(!isValidDataType(pColSchema->type))) {
|
||||
char n[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
tNameExtractFullName(&pTableMetaInfo->name, n);
|
||||
|
||||
tscError("%p tid:%d uid:%" PRIu64 " id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s",
|
||||
pSql, pTableMeta->id.tid, pTableMeta->id.uid, n, total, numOfTagColumns, pCol->colIndex.columnIndex, pColSchema->name);
|
||||
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
|
||||
|
||||
pTagCol->colId = htons(pColSchema->colId);
|
||||
pTagCol->bytes = htons(pColSchema->bytes);
|
||||
pTagCol->type = htons(pColSchema->type);
|
||||
pTagCol->numOfFilters = 0;
|
||||
|
||||
pMsg += sizeof(SColumnInfo);
|
||||
}
|
||||
}
|
||||
|
||||
// serialize tag column query condition
|
||||
if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
|
||||
STagCond* pTagCond = &pQueryInfo->tagCond;
|
||||
|
||||
SCond *pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
|
||||
if (pCond != NULL && pCond->cond != NULL) {
|
||||
pQueryMsg->tagCondLen = htons(pCond->len);
|
||||
memcpy(pMsg, pCond->cond, pCond->len);
|
||||
|
||||
pMsg += pCond->len;
|
||||
}
|
||||
}
|
||||
|
||||
if (pQueryInfo->bufLen > 0) {
|
||||
memcpy(pMsg, pQueryInfo->buf, pQueryInfo->bufLen);
|
||||
pMsg += pQueryInfo->bufLen;
|
||||
}
|
||||
|
||||
SCond* pCond = &pQueryInfo->tagCond.tbnameCond;
|
||||
if (pCond->len > 0) {
|
||||
strncpy(pMsg, pCond->cond, pCond->len);
|
||||
pMsg += pCond->len;
|
||||
}
|
||||
|
||||
// compressed ts block
|
||||
pQueryMsg->tsBuf.tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
|
||||
|
||||
if (pQueryInfo->tsBuf != NULL) {
|
||||
// note: here used the index instead of actual vnode id.
|
||||
int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
|
||||
int32_t code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsBuf.tsLen, &pQueryMsg->tsBuf.tsNumOfBlocks);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
pMsg += pQueryMsg->tsBuf.tsLen;
|
||||
|
||||
pQueryMsg->tsBuf.tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
|
||||
pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen);
|
||||
pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks);
|
||||
}
|
||||
|
||||
memcpy(pMsg, pSql->sqlstr, sqlLen);
|
||||
pMsg += sqlLen;
|
||||
|
||||
int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
|
||||
|
||||
tscDebug("%p msg built success, len:%d bytes", pSql, msgLen);
|
||||
pCmd->payloadLen = msgLen;
|
||||
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
|
||||
|
||||
pQueryMsg->head.contLen = htonl(msgLen);
|
||||
assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
|
||||
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
|
||||
|
@ -818,14 +1126,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
SSqlExpr *pSqlFuncExpr = (SSqlExpr *)pMsg;
|
||||
SSqlExpr *pSqlExpr = (SSqlExpr *)pMsg;
|
||||
|
||||
for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
|
||||
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||
|
||||
// the queried table has been removed and a new table with the same name has already been created already
|
||||
// return error msg
|
||||
if (pExpr->uid != pTableMeta->id.uid) {
|
||||
if (pExpr->base.uid != pTableMeta->id.uid) {
|
||||
tscError("%p table has already been destroyed", pSql);
|
||||
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
|
||||
}
|
||||
|
@ -837,45 +1145,45 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
assert(pExpr->base.resColId < 0);
|
||||
|
||||
pSqlFuncExpr->colInfo.colId = htons(pExpr->base.colInfo.colId);
|
||||
pSqlFuncExpr->colInfo.colIndex = htons(pExpr->base.colInfo.colIndex);
|
||||
pSqlFuncExpr->colInfo.flag = htons(pExpr->base.colInfo.flag);
|
||||
pSqlExpr->colInfo.colId = htons(pExpr->base.colInfo.colId);
|
||||
pSqlExpr->colInfo.colIndex = htons(pExpr->base.colInfo.colIndex);
|
||||
pSqlExpr->colInfo.flag = htons(pExpr->base.colInfo.flag);
|
||||
|
||||
pSqlFuncExpr->colType = htons(pExpr->base.colType);
|
||||
pSqlFuncExpr->colBytes = htons(pExpr->base.colBytes);
|
||||
pSqlExpr->colType = htons(pExpr->base.colType);
|
||||
pSqlExpr->colBytes = htons(pExpr->base.colBytes);
|
||||
|
||||
if (TSDB_COL_IS_UD_COL(pExpr->base.colInfo.flag) || pExpr->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
pSqlFuncExpr->resType = htons(pExpr->base.resType);
|
||||
pSqlFuncExpr->resBytes = htons(pExpr->base.resBytes);
|
||||
pSqlExpr->resType = htons(pExpr->base.resType);
|
||||
pSqlExpr->resBytes = htons(pExpr->base.resBytes);
|
||||
} else if (pExpr->base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
|
||||
SSchema s = tGetBlockDistColumnSchema();
|
||||
|
||||
pSqlFuncExpr->resType = htons(s.type);
|
||||
pSqlFuncExpr->resBytes = htons(s.bytes);
|
||||
pSqlExpr->resType = htons(s.type);
|
||||
pSqlExpr->resBytes = htons(s.bytes);
|
||||
} else {
|
||||
SSchema* s = tscGetColumnSchemaById(pTableMeta, pExpr->base.colInfo.colId);
|
||||
pSqlFuncExpr->resType = htons(s->type);
|
||||
pSqlFuncExpr->resBytes = htons(s->bytes);
|
||||
pSqlExpr->resType = htons(s->type);
|
||||
pSqlExpr->resBytes = htons(s->bytes);
|
||||
}
|
||||
|
||||
pSqlFuncExpr->functionId = htons(pExpr->base.functionId);
|
||||
pSqlFuncExpr->numOfParams = htons(pExpr->base.numOfParams);
|
||||
pSqlFuncExpr->resColId = htons(pExpr->base.resColId);
|
||||
pSqlExpr->functionId = htons(pExpr->base.functionId);
|
||||
pSqlExpr->numOfParams = htons(pExpr->base.numOfParams);
|
||||
pSqlExpr->resColId = htons(pExpr->base.resColId);
|
||||
pMsg += sizeof(SSqlExpr);
|
||||
|
||||
for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) { // todo add log
|
||||
pSqlFuncExpr->param[j].nType = htons((uint16_t)pExpr->base.param[j].nType);
|
||||
pSqlFuncExpr->param[j].nLen = htons(pExpr->base.param[j].nLen);
|
||||
pSqlExpr->param[j].nType = htons((uint16_t)pExpr->base.param[j].nType);
|
||||
pSqlExpr->param[j].nLen = htons(pExpr->base.param[j].nLen);
|
||||
|
||||
if (pExpr->base.param[j].nType == TSDB_DATA_TYPE_BINARY) {
|
||||
memcpy(pMsg, pExpr->base.param[j].pz, pExpr->base.param[j].nLen);
|
||||
pMsg += pExpr->base.param[j].nLen;
|
||||
} else {
|
||||
pSqlFuncExpr->param[j].i64 = htobe64(pExpr->base.param[j].i64);
|
||||
pSqlExpr->param[j].i64 = htobe64(pExpr->base.param[j].i64);
|
||||
}
|
||||
}
|
||||
|
||||
pSqlFuncExpr = (SSqlExpr *)pMsg;
|
||||
pSqlExpr = (SSqlExpr *)pMsg;
|
||||
}
|
||||
|
||||
size_t output = tscNumOfFields(pQueryInfo);
|
||||
|
@ -893,7 +1201,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
if (pExpr->pExpr == NULL) {
|
||||
// the queried table has been removed and a new table with the same name has already been created already
|
||||
// return error msg
|
||||
if (pExpr->uid != pTableMeta->id.uid) {
|
||||
if (pExpr->base.uid != pTableMeta->id.uid) {
|
||||
tscError("%p table has already been destroyed", pSql);
|
||||
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
|
||||
}
|
||||
|
|
|
@ -1463,7 +1463,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
|
|||
int32_t tableIndexOfSub = -1;
|
||||
for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, j);
|
||||
if (pTableMetaInfo->pTableMeta->id.uid == pExpr->uid) {
|
||||
if (pTableMetaInfo->pTableMeta->id.uid == pExpr->base.uid) {
|
||||
tableIndexOfSub = j;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1291,7 +1291,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
|
|||
}
|
||||
|
||||
static SExprInfo* doCreateSqlExpr(SQueryNodeInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
|
||||
int16_t size, int16_t resColId, int16_t interSize, int32_t colType) {
|
||||
int16_t size, int16_t resColId, int16_t interSize, int32_t colType) {
|
||||
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex);
|
||||
|
||||
SExprInfo* pExpr = calloc(1, sizeof(SExprInfo));
|
||||
|
@ -1344,7 +1344,7 @@ static SExprInfo* doCreateSqlExpr(SQueryNodeInfo* pQueryInfo, int16_t functionId
|
|||
p->interBytes = interSize;
|
||||
|
||||
if (pTableMetaInfo->pTableMeta) {
|
||||
pExpr->uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
p->uid = pTableMetaInfo->pTableMeta->id.uid;
|
||||
}
|
||||
|
||||
return pExpr;
|
||||
|
@ -1460,8 +1460,7 @@ int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepco
|
|||
for (int32_t i = 0; i < size; ++i) {
|
||||
SExprInfo* pExpr = taosArrayGetP(src, i);
|
||||
|
||||
if (pExpr->uid == uid) {
|
||||
|
||||
if (pExpr->base.uid == uid) {
|
||||
if (deepcopy) {
|
||||
SExprInfo* p1 = calloc(1, sizeof(SExprInfo));
|
||||
tscSqlExprAssign(p1, pExpr);
|
||||
|
@ -1983,7 +1982,6 @@ void tscInitQueryInfo(SQueryNodeInfo* pQueryInfo) {
|
|||
pQueryInfo->slimit.offset = 0;
|
||||
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES);
|
||||
pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
}
|
||||
|
||||
int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
|
||||
|
@ -3103,7 +3101,7 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryNodeInfo* pQueryNodeInfo) {
|
|||
pQuery->vgId = 0;
|
||||
pQuery->stableQuery = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
|
||||
pQuery->groupbyColumn = tscGroupbyColumn(pQueryNodeInfo);
|
||||
pQuery->interBufSize = getOutputInterResultBufSize(pQuery);
|
||||
pQuery->window = pQueryNodeInfo->window;
|
||||
|
||||
{
|
||||
pQuery->numOfOutput = tscSqlExprNumOfExprs(pQueryNodeInfo);
|
||||
|
@ -3113,6 +3111,12 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryNodeInfo* pQueryNodeInfo) {
|
|||
SExprInfo* pExpr = tscSqlExprGet(pQueryNodeInfo, i);
|
||||
tscSqlExprAssign(&pQuery->pExpr1[i], pExpr);
|
||||
}
|
||||
|
||||
pQuery->colList = calloc(numOfCols, sizeof(SColumnInfo));
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(pQueryNodeInfo->colList, i);
|
||||
pQuery->colList[i] = pCol->info;
|
||||
}
|
||||
}
|
||||
|
||||
{// for simple table, not for super table
|
||||
|
@ -3154,6 +3158,7 @@ SQuery* tscCreateQueryFromQueryNodeInfo(SQueryNodeInfo* pQueryNodeInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
pQuery->interBufSize = getOutputInterResultBufSize(pQuery);
|
||||
return pQuery;
|
||||
}
|
||||
|
||||
|
|
|
@ -64,7 +64,6 @@ typedef struct SSqlExpr {
|
|||
|
||||
typedef struct SExprInfo {
|
||||
SSqlExpr base;
|
||||
int64_t uid;
|
||||
struct tExprNode* pExpr;
|
||||
} SExprInfo;
|
||||
|
||||
|
|
|
@ -397,28 +397,6 @@ typedef struct SColIndex {
|
|||
char name[TSDB_COL_NAME_LEN]; // TODO remove it
|
||||
} SColIndex;
|
||||
|
||||
/* sql function msg, to describe the message to vnode about sql function
|
||||
* operations in select */
|
||||
//typedef struct SSqlFuncMsg {
|
||||
// int16_t functionId;
|
||||
// int16_t numOfParams;
|
||||
//
|
||||
// int16_t resColId; // result column id, id of the current output column
|
||||
// int16_t colType;
|
||||
// int16_t colBytes;
|
||||
//
|
||||
// SColIndex colInfo;
|
||||
// struct ArgElem {
|
||||
// int16_t argType;
|
||||
// int16_t argBytes;
|
||||
// union {
|
||||
// double d;
|
||||
// int64_t i64;
|
||||
// char * pz;
|
||||
// } argValue;
|
||||
// } arg[3];
|
||||
//} SSqlFuncMsg;
|
||||
|
||||
typedef struct SColumnFilterInfo {
|
||||
int16_t lowerRelOptr;
|
||||
int16_t upperRelOptr;
|
||||
|
|
|
@ -5869,7 +5869,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
|
|||
|
||||
int32_t param = (int32_t)pExprs[i].base.param[0].i64;
|
||||
if (pExprs[i].base.functionId != TSDB_FUNC_ARITHM &&
|
||||
(type != pExprs[i].base.resType || bytes != pExprs[i].base.resBytes)) {
|
||||
(type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) {
|
||||
tfree(pExprs);
|
||||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue