[td-11818] support select *
This commit is contained in:
parent
1f6ff95bee
commit
3c88993c96
|
@ -135,9 +135,8 @@ typedef struct SQueryStmtInfo {
|
||||||
SArray *pUdfInfo;
|
SArray *pUdfInfo;
|
||||||
|
|
||||||
struct SQueryStmtInfo *sibling; // sibling
|
struct SQueryStmtInfo *sibling; // sibling
|
||||||
struct SQueryStmtInfo *pDownstream;
|
|
||||||
SMultiFunctionsDesc info;
|
SMultiFunctionsDesc info;
|
||||||
SArray *pUpstream; // SArray<struct SQueryStmtInfo>
|
SArray *pDownstream; // SArray<struct SQueryStmtInfo>
|
||||||
int32_t havingFieldNum;
|
int32_t havingFieldNum;
|
||||||
int32_t exprListLevelIndex;
|
int32_t exprListLevelIndex;
|
||||||
} SQueryStmtInfo;
|
} SQueryStmtInfo;
|
||||||
|
|
|
@ -98,7 +98,7 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf);
|
||||||
* @param msgBufLen
|
* @param msgBufLen
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen);
|
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, SParseBasicCtx *pCtx, char* msg, int32_t msgBufLen);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destroy the meta data request structure.
|
* Destroy the meta data request structure.
|
||||||
|
|
|
@ -213,7 +213,7 @@ SQueryStmtInfo *createQueryInfo() {
|
||||||
|
|
||||||
pQueryInfo->slimit.limit = -1;
|
pQueryInfo->slimit.limit = -1;
|
||||||
pQueryInfo->slimit.offset = 0;
|
pQueryInfo->slimit.offset = 0;
|
||||||
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES);
|
pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES);
|
||||||
pQueryInfo->window = TSWINDOW_INITIALIZER;
|
pQueryInfo->window = TSWINDOW_INITIALIZER;
|
||||||
|
|
||||||
pQueryInfo->exprList = calloc(10, POINTER_BYTES);
|
pQueryInfo->exprList = calloc(10, POINTER_BYTES);
|
||||||
|
@ -247,8 +247,8 @@ static void destroyQueryInfoImpl(SQueryStmtInfo* pQueryInfo) {
|
||||||
tfree(pQueryInfo->fillVal);
|
tfree(pQueryInfo->fillVal);
|
||||||
tfree(pQueryInfo->buf);
|
tfree(pQueryInfo->buf);
|
||||||
|
|
||||||
taosArrayDestroy(pQueryInfo->pUpstream);
|
taosArrayDestroy(pQueryInfo->pDownstream);
|
||||||
pQueryInfo->pUpstream = NULL;
|
pQueryInfo->pDownstream = NULL;
|
||||||
pQueryInfo->bufLen = 0;
|
pQueryInfo->bufLen = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -256,9 +256,9 @@ void destroyQueryInfo(SQueryStmtInfo* pQueryInfo) {
|
||||||
while (pQueryInfo != NULL) {
|
while (pQueryInfo != NULL) {
|
||||||
SQueryStmtInfo* p = pQueryInfo->sibling;
|
SQueryStmtInfo* p = pQueryInfo->sibling;
|
||||||
|
|
||||||
size_t numOfUpstream = taosArrayGetSize(pQueryInfo->pUpstream);
|
size_t numOfUpstream = taosArrayGetSize(pQueryInfo->pDownstream);
|
||||||
for (int32_t i = 0; i < numOfUpstream; ++i) {
|
for (int32_t i = 0; i < numOfUpstream; ++i) {
|
||||||
SQueryStmtInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pUpstream, i);
|
SQueryStmtInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pDownstream, i);
|
||||||
destroyQueryInfoImpl(pUpQueryInfo);
|
destroyQueryInfoImpl(pUpQueryInfo);
|
||||||
clearAllTableMetaInfo(pUpQueryInfo, false, 0);
|
clearAllTableMetaInfo(pUpQueryInfo, false, 0);
|
||||||
tfree(pUpQueryInfo);
|
tfree(pUpQueryInfo);
|
||||||
|
@ -288,7 +288,6 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SQueryStmtI
|
||||||
}
|
}
|
||||||
|
|
||||||
pSub->pUdfInfo = pUdfInfo;
|
pSub->pUdfInfo = pUdfInfo;
|
||||||
pSub->pDownstream = pQueryInfo;
|
|
||||||
int32_t code = validateSqlNode(p, pSub, pMsgBuf);
|
int32_t code = validateSqlNode(p, pSub, pMsgBuf);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -311,7 +310,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SQueryStmtI
|
||||||
tstrncpy(pTableMetaInfo1->aliasName, subInfo->aliasName.z, subInfo->aliasName.n + 1);
|
tstrncpy(pTableMetaInfo1->aliasName, subInfo->aliasName.z, subInfo->aliasName.n + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pQueryInfo->pUpstream, &pSub);
|
taosArrayPush(pQueryInfo->pDownstream, &pSub);
|
||||||
|
|
||||||
// NOTE: order mix up in subquery not support yet.
|
// NOTE: order mix up in subquery not support yet.
|
||||||
pQueryInfo->order = pSub->order;
|
pQueryInfo->order = pSub->order;
|
||||||
|
@ -600,7 +599,7 @@ int32_t checkForUnsupportedQuery(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) {
|
||||||
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (f == FUNCTION_BLKINFO && taosArrayGetSize(pQueryInfo->pUpstream) > 0) {
|
if (f == FUNCTION_BLKINFO && taosArrayGetSize(pQueryInfo->pDownstream) > 0) {
|
||||||
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1584,7 +1583,6 @@ int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf*
|
||||||
}
|
}
|
||||||
|
|
||||||
pushDownAggFuncExprInfo(pQueryInfo);
|
pushDownAggFuncExprInfo(pQueryInfo);
|
||||||
// addColumnNodeFromLowerLevel(pQueryInfo);
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < 1; ++i) {
|
for(int32_t i = 0; i < 1; ++i) {
|
||||||
SArray* functionList = extractFunctionList(pQueryInfo->exprList[i]);
|
SArray* functionList = extractFunctionList(pQueryInfo->exprList[i]);
|
||||||
|
@ -3904,17 +3902,30 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
|
||||||
|
|
||||||
// TODO: check if the qnode info has been cached already
|
// TODO: check if the qnode info has been cached already
|
||||||
req.qNodeRequired = true;
|
req.qNodeRequired = true;
|
||||||
code = qParserExtractRequestedMetaInfo(pInfo, &req, msgBuf, msgBufLen);
|
code = qParserExtractRequestedMetaInfo(pInfo, &req, pCtx, msgBuf, msgBufLen);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// load the meta data from catalog
|
// load the meta data from catalog
|
||||||
code = catalogGetAllMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &req, &data);
|
// code = catalogGetAllMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &req, &data);
|
||||||
|
STableMeta* pmt = NULL;
|
||||||
|
|
||||||
|
SName* name = taosArrayGet(req.pTableName, 0);
|
||||||
|
code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &pmt);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
data.pTableMeta = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
taosArrayPush(data.pTableMeta, &pmt);
|
||||||
|
|
||||||
|
pQueryInfo->pTableMetaInfo = calloc(1, POINTER_BYTES);
|
||||||
|
pQueryInfo->pTableMetaInfo[0] = calloc(1, sizeof(STableMetaInfo));
|
||||||
|
pQueryInfo->pTableMetaInfo[0]->pTableMeta = pmt;
|
||||||
|
pQueryInfo->pTableMetaInfo[0]->name = *name;
|
||||||
|
pQueryInfo->numOfTables = 1;
|
||||||
|
|
||||||
// evaluate the sqlnode
|
// evaluate the sqlnode
|
||||||
STableMeta* pTableMeta = (STableMeta*) taosArrayGetP(data.pTableMeta, 0);
|
STableMeta* pTableMeta = (STableMeta*) taosArrayGetP(data.pTableMeta, 0);
|
||||||
assert(pTableMeta != NULL);
|
assert(pTableMeta != NULL);
|
||||||
|
|
|
@ -194,6 +194,18 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) {
|
||||||
TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
|
TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val = htonl(pCreate->maxRows);
|
||||||
|
if (val < TSDB_MIN_MAX_ROW_FBLOCK || val > TSDB_MAX_MAX_ROW_FBLOCK) {
|
||||||
|
snprintf(msg, tListLen(msg), "invalid number of max rows in file block for DB:%d valid range: [%d, %d]", val,
|
||||||
|
TSDB_MIN_MAX_ROW_FBLOCK, TSDB_MAX_MAX_ROW_FBLOCK);
|
||||||
|
}
|
||||||
|
|
||||||
|
val = htonl(pCreate->minRows);
|
||||||
|
if (val < TSDB_MIN_MIN_ROW_FBLOCK || val > TSDB_MAX_MIN_ROW_FBLOCK) {
|
||||||
|
snprintf(msg, tListLen(msg), "invalid number of min rows in file block for DB:%d valid range: [%d, %d]", val,
|
||||||
|
TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -61,7 +61,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
||||||
pDcl->nodeType = info.type;
|
pDcl->nodeType = info.type;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo));
|
SQueryStmtInfo* pQueryInfo = createQueryInfo();
|
||||||
if (pQueryInfo == NULL) {
|
if (pQueryInfo == NULL) {
|
||||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; // set correct error code.
|
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; // set correct error code.
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -89,7 +89,7 @@ int32_t qParserConvertSql(const char* pStr, size_t length, char** pConvertSql) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SMsgBuf* pMsgBuf);
|
static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf);
|
||||||
|
|
||||||
static int32_t tnameComparFn(const void* p1, const void* p2) {
|
static int32_t tnameComparFn(const void* p1, const void* p2) {
|
||||||
SName* pn1 = (SName*)p1;
|
SName* pn1 = (SName*)p1;
|
||||||
|
@ -113,7 +113,7 @@ static int32_t tnameComparFn(const void* p1, const void* p2) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameList, SMsgBuf* pMsgBuf) {
|
static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameList, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf) {
|
||||||
int32_t numOfSub = (int32_t)taosArrayGetSize(pSqlNode->from->list);
|
int32_t numOfSub = (int32_t)taosArrayGetSize(pSqlNode->from->list);
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfSub; ++j) {
|
for (int32_t j = 0; j < numOfSub; ++j) {
|
||||||
|
@ -123,12 +123,12 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
SSqlNode* p = taosArrayGetP(sub->pSubquery->node, i);
|
SSqlNode* p = taosArrayGetP(sub->pSubquery->node, i);
|
||||||
if (p->from->type == SQL_FROM_NODE_TABLES) {
|
if (p->from->type == SQL_FROM_NODE_TABLES) {
|
||||||
int32_t code = getTableNameFromSqlNode(p, tableNameList, pMsgBuf);
|
int32_t code = getTableNameFromSqlNode(p, tableNameList, pCtx, pMsgBuf);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
getTableNameFromSubquery(p, tableNameList, pMsgBuf);
|
getTableNameFromSubquery(p, tableNameList, pCtx, pMsgBuf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -136,7 +136,7 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SMsgBuf* pMsgBuf) {
|
int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SParseBasicCtx *pParseCtx, SMsgBuf* pMsgBuf) {
|
||||||
const char* msg1 = "invalid table name";
|
const char* msg1 = "invalid table name";
|
||||||
|
|
||||||
int32_t numOfTables = (int32_t) taosArrayGetSize(pSqlNode->from->list);
|
int32_t numOfTables = (int32_t) taosArrayGetSize(pSqlNode->from->list);
|
||||||
|
@ -155,7 +155,11 @@ int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList, SMsgB
|
||||||
}
|
}
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
strndequote(name.tname, t->z, t->n);
|
int32_t code = createSName(&name, t, pParseCtx, pMsgBuf);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayPush(tableNameList, &name);
|
taosArrayPush(tableNameList, &name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,7 +170,7 @@ static void freePtrElem(void* p) {
|
||||||
tfree(*(char**)p);
|
tfree(*(char**)p);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen) {
|
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, SParseBasicCtx *pCtx, char* msg, int32_t msgBufLen) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SMsgBuf msgBuf = {.buf = msg, .len = msgBufLen};
|
SMsgBuf msgBuf = {.buf = msg, .len = msgBufLen};
|
||||||
|
|
||||||
|
@ -182,12 +186,12 @@ int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* p
|
||||||
|
|
||||||
// load the table meta in the FROM clause
|
// load the table meta in the FROM clause
|
||||||
if (pSqlNode->from->type == SQL_FROM_NODE_TABLES) {
|
if (pSqlNode->from->type == SQL_FROM_NODE_TABLES) {
|
||||||
code = getTableNameFromSqlNode(pSqlNode, pMetaInfo->pTableName, &msgBuf);
|
code = getTableNameFromSqlNode(pSqlNode, pMetaInfo->pTableName, pCtx, &msgBuf);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
code = getTableNameFromSubquery(pSqlNode, pMetaInfo->pTableName, &msgBuf);
|
code = getTableNameFromSubquery(pSqlNode, pMetaInfo->pTableName, pCtx, &msgBuf);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,12 +77,15 @@ void sqlCheck(const char* sql, bool valid) {
|
||||||
buf.len = 128;
|
buf.len = 128;
|
||||||
buf.buf = msg;
|
buf.buf = msg;
|
||||||
|
|
||||||
|
SParseBasicCtx ctx = {0};
|
||||||
|
ctx.db = "db1";
|
||||||
|
ctx.acctId = 1;
|
||||||
SSqlNode* pNode = (SSqlNode*)taosArrayGetP(((SArray*)info1.sub.node), 0);
|
SSqlNode* pNode = (SSqlNode*)taosArrayGetP(((SArray*)info1.sub.node), 0);
|
||||||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
@ -119,7 +122,11 @@ TEST(testCase, validateAST_test) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
SParseBasicCtx ctx = {0};
|
||||||
|
ctx.db = "db1";
|
||||||
|
ctx.acctId = 1;
|
||||||
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
|
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
@ -177,7 +184,11 @@ TEST(testCase, function_Test) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
SParseBasicCtx ctx = {0};
|
||||||
|
ctx.db = "db1";
|
||||||
|
ctx.acctId = 1;
|
||||||
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
|
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
@ -223,7 +234,11 @@ TEST(testCase, function_Test2) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
SParseBasicCtx ctx = {0};
|
||||||
|
ctx.db = "db1";
|
||||||
|
ctx.acctId = 1;
|
||||||
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
|
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
@ -269,7 +284,11 @@ TEST(testCase, function_Test3) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
SParseBasicCtx ctx = {0};
|
||||||
|
ctx.db = "db1";
|
||||||
|
ctx.acctId = 1;
|
||||||
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
|
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
@ -314,7 +333,11 @@ TEST(testCase, function_Test4) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
SParseBasicCtx ctx = {0};
|
||||||
|
ctx.db = "db1";
|
||||||
|
ctx.acctId = 1;
|
||||||
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
|
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
@ -362,7 +385,11 @@ TEST(testCase, function_Test5) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
SParseBasicCtx ctx = {0};
|
||||||
|
ctx.db = "db1";
|
||||||
|
ctx.acctId = 1;
|
||||||
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
|
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
@ -447,7 +474,11 @@ TEST(testCase, function_Test6) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
SParseBasicCtx ctx = {0};
|
||||||
|
ctx.db = "db1";
|
||||||
|
ctx.acctId = 1;
|
||||||
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
|
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
@ -525,7 +556,11 @@ TEST(testCase, function_Test6) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
SParseBasicCtx ctx = {0};
|
||||||
|
ctx.db = "db1";
|
||||||
|
ctx.acctId = 1;
|
||||||
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
|
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
@ -587,7 +622,11 @@ TEST(testCase, function_Test6) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
SParseBasicCtx ctx = {0};
|
||||||
|
ctx.db = "db1";
|
||||||
|
ctx.acctId = 1;
|
||||||
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
|
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
@ -636,7 +675,7 @@ TEST(testCase, function_Test6) {
|
||||||
code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
@ -666,7 +705,10 @@ TEST(testCase, function_Test6) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
SParseBasicCtx ctx = {0};
|
||||||
|
ctx.db = "db1";
|
||||||
|
ctx.acctId = 1;
|
||||||
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
@ -688,7 +730,7 @@ TEST(testCase, function_Test6) {
|
||||||
code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,8 @@ void generateLogicplan(const char* sql) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
SParseBasicCtx ctx = {0};
|
||||||
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
@ -121,7 +122,9 @@ TEST(testCase, planner_test) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
SParseBasicCtx ctx = {0};
|
||||||
|
|
||||||
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
|
|
@ -710,7 +710,11 @@ TEST(testCase, extractMeta_test) {
|
||||||
|
|
||||||
char msg[128] = {0};
|
char msg[128] = {0};
|
||||||
SCatalogReq req = {0};
|
SCatalogReq req = {0};
|
||||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
|
||||||
|
SParseBasicCtx ctx = {0};
|
||||||
|
ctx.db = "db1";
|
||||||
|
ctx.acctId = 1;
|
||||||
|
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, &ctx, msg, 128);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||||
|
|
||||||
|
|
|
@ -53,8 +53,8 @@ typedef struct SQueryDistPlanNodeInfo {
|
||||||
typedef struct SQueryTableInfo {
|
typedef struct SQueryTableInfo {
|
||||||
char *tableName; // to be deleted
|
char *tableName; // to be deleted
|
||||||
uint64_t uid; // to be deleted
|
uint64_t uid; // to be deleted
|
||||||
STableMetaInfo* pMeta;
|
STableMetaInfo *pMeta;
|
||||||
STimeWindow window;
|
STimeWindow window;
|
||||||
} SQueryTableInfo;
|
} SQueryTableInfo;
|
||||||
|
|
||||||
typedef struct SQueryPlanNode {
|
typedef struct SQueryPlanNode {
|
||||||
|
|
|
@ -64,10 +64,11 @@ static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode*
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createSelectPlan(const SQueryStmtInfo* pSelect, SQueryPlanNode** pQueryPlan) {
|
int32_t createSelectPlan(const SQueryStmtInfo* pSelect, SQueryPlanNode** pQueryPlan) {
|
||||||
SArray* upstream = createQueryPlanImpl(pSelect);
|
SArray* pDownstream = createQueryPlanImpl(pSelect);
|
||||||
assert(taosArrayGetSize(upstream) == 1);
|
assert(taosArrayGetSize(pDownstream) == 1);
|
||||||
*pQueryPlan = taosArrayGetP(upstream, 0);
|
|
||||||
taosArrayDestroy(upstream);
|
*pQueryPlan = taosArrayGetP(pDownstream, 0);
|
||||||
|
taosArrayDestroy(pDownstream);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,23 +101,21 @@ void destroyQueryPlan(SQueryPlanNode* pQueryNode) {
|
||||||
|
|
||||||
//======================================================================================================================
|
//======================================================================================================================
|
||||||
|
|
||||||
static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev,
|
static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** pChildrenNode, int32_t numOfChildren,
|
||||||
SExprInfo** pExpr, int32_t numOfOutput, const void* pExtInfo) {
|
SExprInfo** pExpr, int32_t numOfOutput, const void* pExtInfo) {
|
||||||
SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode));
|
SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode));
|
||||||
|
|
||||||
pNode->info.type = type;
|
pNode->info.type = type;
|
||||||
pNode->info.name = strdup(name);
|
pNode->info.name = strdup(name);
|
||||||
|
|
||||||
pNode->numOfExpr = numOfOutput;
|
pNode->numOfExpr = numOfOutput;
|
||||||
pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES);
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES);
|
||||||
taosArrayPush(pNode->pExpr, &pExpr[i]);
|
taosArrayAddBatch(pNode->pExpr, pExpr, numOfOutput);
|
||||||
}
|
assert(pNode->numOfExpr == numOfOutput);
|
||||||
|
|
||||||
pNode->pChildren = taosArrayInit(4, POINTER_BYTES);
|
pNode->pChildren = taosArrayInit(4, POINTER_BYTES);
|
||||||
for(int32_t i = 0; i < numOfPrev; ++i) {
|
for(int32_t i = 0; i < numOfChildren; ++i) {
|
||||||
taosArrayPush(pNode->pChildren, &prev[i]);
|
taosArrayPush(pNode->pChildren, &pChildrenNode[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
switch(type) {
|
switch(type) {
|
||||||
|
@ -184,8 +183,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
|
||||||
return pNode;
|
return pNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SQueryTableInfo* info,
|
static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQueryTableInfo* info, SArray* pExprs, SArray* tableCols) {
|
||||||
SArray* pExprs, SArray* tableCols) {
|
|
||||||
if (pQueryInfo->info.onlyTagQuery) {
|
if (pQueryInfo->info.onlyTagQuery) {
|
||||||
int32_t num = (int32_t) taosArrayGetSize(pExprs);
|
int32_t num = (int32_t) taosArrayGetSize(pExprs);
|
||||||
SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info);
|
SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info);
|
||||||
|
@ -193,16 +191,12 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, ST
|
||||||
if (pQueryInfo->info.distinct) {
|
if (pQueryInfo->info.distinct) {
|
||||||
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL);
|
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pNode;
|
return pNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
|
SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
|
||||||
|
|
||||||
if (pQueryInfo->info.projectionQuery) {
|
if (!pQueryInfo->info.projectionQuery) {
|
||||||
int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprs);
|
|
||||||
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, NULL);
|
|
||||||
} else {
|
|
||||||
STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0);
|
STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0);
|
||||||
|
|
||||||
// table source column projection, generate the projection expr
|
// table source column projection, generate the projection expr
|
||||||
|
@ -262,7 +256,11 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo*
|
||||||
pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, NULL);
|
pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, NULL);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL);
|
// here we can push down the projection to tablescan operator.
|
||||||
|
pNode->numOfExpr = num;
|
||||||
|
pNode->pExpr = taosArrayInit(num, POINTER_BYTES);
|
||||||
|
taosArrayAddAll(pNode->pExpr, p);
|
||||||
|
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -299,9 +297,11 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTable(const SQueryStmtInfo* pQu
|
||||||
tstrncpy(name, pTableMetaInfo->name.tname, TSDB_TABLE_FNAME_LEN);
|
tstrncpy(name, pTableMetaInfo->name.tname, TSDB_TABLE_FNAME_LEN);
|
||||||
|
|
||||||
SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,};
|
SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,};
|
||||||
|
info.window = pQueryInfo->window;
|
||||||
|
info.pMeta = pTableMetaInfo;
|
||||||
|
|
||||||
// handle the only tag query
|
// handle the only tag query
|
||||||
SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, pExprs, tableCols);
|
SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, &info, pExprs, tableCols);
|
||||||
if (pQueryInfo->info.onlyTagQuery) {
|
if (pQueryInfo->info.onlyTagQuery) {
|
||||||
tfree(info.tableName);
|
tfree(info.tableName);
|
||||||
return pNode;
|
return pNode;
|
||||||
|
@ -326,23 +326,23 @@ static bool isAllAggExpr(SArray* pList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
|
SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
|
||||||
SArray* upstream = NULL;
|
SArray* pDownstream = NULL;
|
||||||
|
|
||||||
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // subquery in the from clause
|
if (pQueryInfo->pDownstream != NULL && taosArrayGetSize(pQueryInfo->pDownstream) > 0) { // subquery in the from clause
|
||||||
upstream = taosArrayInit(4, POINTER_BYTES);
|
pDownstream = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(pQueryInfo->pUpstream);
|
size_t size = taosArrayGetSize(pQueryInfo->pDownstream);
|
||||||
for(int32_t i = 0; i < size; ++i) {
|
for(int32_t i = 0; i < size; ++i) {
|
||||||
SQueryStmtInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i);
|
SQueryStmtInfo* pq = taosArrayGet(pQueryInfo->pDownstream, i);
|
||||||
SArray* p = createQueryPlanImpl(pq);
|
SArray* p = createQueryPlanImpl(pq);
|
||||||
taosArrayAddBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p));
|
taosArrayAddBatch(pDownstream, p->pData, (int32_t) taosArrayGetSize(p));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQueryInfo->numOfTables > 1) { // it is a join query
|
if (pQueryInfo->numOfTables > 1) { // it is a join query
|
||||||
// 1. separate the select clause according to table
|
// 1. separate the select clause according to table
|
||||||
taosArrayDestroy(upstream);
|
taosArrayDestroy(pDownstream);
|
||||||
upstream = taosArrayInit(5, POINTER_BYTES);
|
pDownstream = taosArrayInit(5, POINTER_BYTES);
|
||||||
|
|
||||||
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||||
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[i];
|
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[i];
|
||||||
|
@ -365,30 +365,30 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
|
||||||
columnListCopy(tableColumnList, pQueryInfo->colList, uid);
|
columnListCopy(tableColumnList, pQueryInfo->colList, uid);
|
||||||
|
|
||||||
// 4. add the projection query node
|
// 4. add the projection query node
|
||||||
SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, exprList, tableColumnList);
|
SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, &info, exprList, tableColumnList);
|
||||||
columnListDestroy(tableColumnList);
|
columnListDestroy(tableColumnList);
|
||||||
// dropAllExprInfo(exprList);
|
// dropAllExprInfo(exprList);
|
||||||
taosArrayPush(upstream, &pNode);
|
taosArrayPush(pDownstream, &pNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. add the join node here
|
// 3. add the join node here
|
||||||
SQueryTableInfo info = {0};
|
SQueryTableInfo info = {0};
|
||||||
int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]);
|
int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]);
|
||||||
SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables,
|
SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", pDownstream->pData, pQueryInfo->numOfTables,
|
||||||
pQueryInfo->exprList[0]->pData, num, NULL);
|
pQueryInfo->exprList[0]->pData, num, NULL);
|
||||||
|
|
||||||
// 4. add the aggregation or projection execution node
|
// 4. add the aggregation or projection execution node
|
||||||
pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info);
|
pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info);
|
||||||
upstream = taosArrayInit(5, POINTER_BYTES);
|
pDownstream = taosArrayInit(5, POINTER_BYTES);
|
||||||
taosArrayPush(upstream, &pNode);
|
taosArrayPush(pDownstream, &pNode);
|
||||||
} else { // only one table, normal query process
|
} else { // only one table, normal query process
|
||||||
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
|
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
|
||||||
SQueryPlanNode* pNode = doCreateQueryPlanForSingleTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList);
|
SQueryPlanNode* pNode = doCreateQueryPlanForSingleTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList);
|
||||||
upstream = taosArrayInit(5, POINTER_BYTES);
|
pDownstream = taosArrayInit(5, POINTER_BYTES);
|
||||||
taosArrayPush(upstream, &pNode);
|
taosArrayPush(pDownstream, &pNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
return upstream;
|
return pDownstream;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
|
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
|
||||||
|
@ -434,22 +434,23 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
|
||||||
switch(pQueryNode->info.type) {
|
switch(pQueryNode->info.type) {
|
||||||
case QNODE_TABLESCAN: {
|
case QNODE_TABLESCAN: {
|
||||||
SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo;
|
SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo;
|
||||||
len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64, pInfo->tableName, pInfo->uid,
|
len1 = sprintf(buf + len, "%s #%" PRIu64, pInfo->tableName, pInfo->uid);
|
||||||
pInfo->window.skey, pInfo->window.ekey);
|
|
||||||
assert(len1 > 0);
|
assert(len1 > 0);
|
||||||
len += len1;
|
len += len1;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
|
len1 = sprintf(buf + len, " , cols:");
|
||||||
SColumn* pCol = taosArrayGetP(pQueryNode->pExpr, i);
|
assert(len1 > 0);
|
||||||
len1 = sprintf(buf + len, " [%s #%d] ", pCol->name, pCol->info.colId);
|
len += len1;
|
||||||
|
|
||||||
assert(len1 > 0);
|
len = printExprInfo(buf, pQueryNode, len);
|
||||||
len += len1;
|
len1 = sprintf(buf + len, ")");
|
||||||
}
|
|
||||||
|
|
||||||
len1 = sprintf(buf + len, "\n");
|
|
||||||
assert(len1 > 0);
|
assert(len1 > 0);
|
||||||
|
|
||||||
|
// todo print filter info
|
||||||
|
len1 = sprintf(buf + len, ") filters:(nil)");
|
||||||
|
len += len1;
|
||||||
|
|
||||||
|
len1 = sprintf(buf + len, " time_range: %" PRId64 " - %" PRId64"\n", pInfo->window.skey, pInfo->window.ekey);
|
||||||
len += len1;
|
len += len1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -187,7 +187,8 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
|
||||||
SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
|
SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
|
||||||
subplan->id = pCxt->nextId;
|
subplan->id = pCxt->nextId;
|
||||||
++(pCxt->nextId.subplanId);
|
++(pCxt->nextId.subplanId);
|
||||||
subplan->type = type;
|
|
||||||
|
subplan->type = type;
|
||||||
subplan->level = 0;
|
subplan->level = 0;
|
||||||
if (NULL != pCxt->pCurrentSubplan) {
|
if (NULL != pCxt->pCurrentSubplan) {
|
||||||
subplan->level = pCxt->pCurrentSubplan->level + 1;
|
subplan->level = pCxt->pCurrentSubplan->level + 1;
|
||||||
|
@ -275,6 +276,8 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||||
case QNODE_TABLESCAN:
|
case QNODE_TABLESCAN:
|
||||||
node = createTableScanNode(pCxt, pPlanNode);
|
node = createTableScanNode(pCxt, pPlanNode);
|
||||||
break;
|
break;
|
||||||
|
case QNODE_PROJECT:
|
||||||
|
// node = create
|
||||||
case QNODE_MODIFY:
|
case QNODE_MODIFY:
|
||||||
// Insert is not an operator in a physical plan.
|
// Insert is not an operator in a physical plan.
|
||||||
break;
|
break;
|
||||||
|
@ -335,7 +338,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
|
||||||
.pCatalog = pCatalog,
|
.pCatalog = pCatalog,
|
||||||
.pDag = validPointer(calloc(1, sizeof(SQueryDag))),
|
.pDag = validPointer(calloc(1, sizeof(SQueryDag))),
|
||||||
.pCurrentSubplan = NULL,
|
.pCurrentSubplan = NULL,
|
||||||
.nextId = {0} // todo queryid
|
.nextId = {.queryId = requestId},
|
||||||
};
|
};
|
||||||
|
|
||||||
*pDag = context.pDag;
|
*pDag = context.pDag;
|
||||||
|
|
|
@ -230,9 +230,11 @@ static bool columnInfoToJson(const void* obj, cJSON* jCol) {
|
||||||
if (res) {
|
if (res) {
|
||||||
res = cJSON_AddNumberToObject(jCol, jkColumnInfoBytes, col->bytes);
|
res = cJSON_AddNumberToObject(jCol, jkColumnInfoBytes, col->bytes);
|
||||||
}
|
}
|
||||||
if (res) {
|
|
||||||
res = addRawArray(jCol, jkColumnInfoFilterList, columnFilterInfoToJson, col->flist.filterInfo, sizeof(SColumnFilterInfo), col->flist.numOfFilters);
|
if (res) { // TODO: temporarily disable it
|
||||||
|
// res = addRawArray(jCol, jkColumnInfoFilterList, columnFilterInfoToJson, col->flist.filterInfo, sizeof(SColumnFilterInfo), col->flist.numOfFilters);
|
||||||
}
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -794,7 +796,6 @@ static cJSON* subplanToJson(const SSubplan* subplan) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// The 'type', 'level', 'execEpSet', 'pChildren' and 'pParents' fields do not need to be serialized.
|
// The 'type', 'level', 'execEpSet', 'pChildren' and 'pParents' fields do not need to be serialized.
|
||||||
|
|
||||||
bool res = addObject(jSubplan, jkSubplanId, subplanIdToJson, &subplan->id);
|
bool res = addObject(jSubplan, jkSubplanId, subplanIdToJson, &subplan->id);
|
||||||
if (res) {
|
if (res) {
|
||||||
res = addObject(jSubplan, jkSubplanNode, phyNodeToJson, subplan->pNode);
|
res = addObject(jSubplan, jkSubplanNode, phyNodeToJson, subplan->pNode);
|
||||||
|
@ -807,6 +808,7 @@ static cJSON* subplanToJson(const SSubplan* subplan) {
|
||||||
cJSON_Delete(jSubplan);
|
cJSON_Delete(jSubplan);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return jSubplan;
|
return jSubplan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -64,6 +64,10 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char* str = NULL;
|
||||||
|
queryPlanToString(logicPlan, &str);
|
||||||
|
printf("%s\n", str);
|
||||||
|
|
||||||
code = optimizeQueryPlan(logicPlan);
|
code = optimizeQueryPlan(logicPlan);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
destroyQueryPlan(logicPlan);
|
destroyQueryPlan(logicPlan);
|
||||||
|
|
|
@ -88,7 +88,6 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,19 +247,20 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t addNum = 0;
|
int32_t addNum = 0;
|
||||||
int32_t nodeNum = taosArrayGetSize(job->nodeList);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
|
|
||||||
SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i);
|
|
||||||
|
|
||||||
if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) {
|
|
||||||
qError("taosArrayPush failed");
|
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
||||||
}
|
|
||||||
|
|
||||||
++addNum;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if (job->nodeList) {
|
||||||
|
int32_t nodeNum = (int32_t) taosArrayGetSize(job->nodeList);
|
||||||
|
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
|
||||||
|
SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i);
|
||||||
|
|
||||||
|
if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) {
|
||||||
|
qError("taosArrayPush failed");
|
||||||
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
++addNum;
|
||||||
|
}
|
||||||
|
}
|
||||||
/*
|
/*
|
||||||
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
|
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
|
||||||
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
|
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
|
||||||
|
@ -279,8 +279,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks),
|
qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks), pJob->queryId);
|
||||||
pJob->queryId);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -997,7 +996,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob) {
|
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob) {
|
||||||
if (NULL == transport || NULL == nodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
if (NULL == transport || /*NULL == nodeList || */NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue