[td-2859]<feature>: support nest query.
This commit is contained in:
parent
409ad8c4c1
commit
84e893e507
|
@ -230,7 +230,8 @@ typedef struct SQueryInfo {
|
||||||
char* buf;
|
char* buf;
|
||||||
|
|
||||||
struct SQueryInfo *sibling; // sibling
|
struct SQueryInfo *sibling; // sibling
|
||||||
SArray *pUpstream; //SArray<struct SQueryInfo>
|
SArray *pUpstream; // SArray<struct SQueryInfo>
|
||||||
|
SArray *pDownstream; // SArray<struct SQueryInfo>
|
||||||
} SQueryInfo;
|
} SQueryInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -6677,8 +6677,11 @@ int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t ind
|
||||||
tscInitQueryInfo(pQueryInfo1);
|
tscInitQueryInfo(pQueryInfo1);
|
||||||
|
|
||||||
pQueryInfo1->pUpstream = taosArrayInit(4, POINTER_BYTES);
|
pQueryInfo1->pUpstream = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
pQueryInfo1->pDownstream = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
|
||||||
for(int32_t x = 0; x < pCmd->numOfClause; ++x) {
|
for(int32_t x = 0; x < pCmd->numOfClause; ++x) {
|
||||||
taosArrayPush(pQueryInfo1->pUpstream, &pCmd->pQueryInfo[x]);
|
taosArrayPush(pQueryInfo1->pUpstream, &pCmd->pQueryInfo[x]);
|
||||||
|
taosArrayPush(pCmd->pQueryInfo[x]->pDownstream, &pQueryInfo1);
|
||||||
}
|
}
|
||||||
|
|
||||||
pQueryInfo1->numOfTables = 1;
|
pQueryInfo1->numOfTables = 1;
|
||||||
|
|
|
@ -2331,6 +2331,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
prepareInputDataFromUpstream(pRes, pQueryInfo);
|
prepareInputDataFromUpstream(pRes, pQueryInfo);
|
||||||
|
|
||||||
if (pSql->pSubscription != NULL) {
|
if (pSql->pSubscription != NULL) {
|
||||||
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
|
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
|
||||||
|
|
||||||
|
|
|
@ -372,7 +372,11 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
||||||
printf("abc\n");
|
if (pQueryInfo->pDownstream != NULL && taosArrayGetSize(pQueryInfo->pDownstream) > 0) {
|
||||||
|
// handle the following query process
|
||||||
|
SQueryInfo* px = taosArrayGetP(pQueryInfo->pDownstream, 0);
|
||||||
|
printf("%d\n", px->type);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
|
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
|
||||||
|
@ -1772,6 +1776,8 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
|
||||||
pQueryInfo->slimit.limit = -1;
|
pQueryInfo->slimit.limit = -1;
|
||||||
pQueryInfo->slimit.offset = 0;
|
pQueryInfo->slimit.offset = 0;
|
||||||
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES);
|
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
|
int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
|
||||||
|
|
|
@ -5770,7 +5770,7 @@ _cleanup:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
|
static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg) {
|
||||||
qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg);
|
qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg);
|
||||||
|
|
||||||
tExprNode* pExprNode = NULL;
|
tExprNode* pExprNode = NULL;
|
||||||
|
@ -5812,17 +5812,25 @@ static int32_t updateOutputBufForTopBotQuery(SQueryTableMsg* pQueryMsg, SColumnI
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO tag length should be passed from client
|
// TODO tag length should be passed from client
|
||||||
|
typedef struct {
|
||||||
|
int32_t numOfOutput;
|
||||||
|
int32_t numOfTags;
|
||||||
|
int32_t numOfCols;
|
||||||
|
SColumnInfo* colList;
|
||||||
|
int32_t queryTest;
|
||||||
|
} SQueriedTableMeta;
|
||||||
|
|
||||||
int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo,
|
int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo,
|
||||||
SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols) {
|
SSqlFuncMsg** pExprMsg, SColumnInfo* pTagCols, bool isSuperTable) {
|
||||||
*pExprInfo = NULL;
|
*pExprInfo = NULL;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SExprInfo *pExprs = (SExprInfo *)calloc(pQueryMsg->numOfOutput, sizeof(SExprInfo));
|
SExprInfo *pExprs = (SExprInfo *)calloc(numOfOutput, sizeof(SExprInfo));
|
||||||
if (pExprs == NULL) {
|
if (pExprs == NULL) {
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType);
|
// bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType);
|
||||||
int16_t tagLen = 0;
|
int16_t tagLen = 0;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
|
Loading…
Reference in New Issue