commit
c83a07277f
|
@ -97,6 +97,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
||||||
pMsg->contentLen = pMsg->contentLen;
|
pMsg->contentLen = pMsg->contentLen;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
qDebugL("stream task string %s", (const char*)msg);
|
||||||
|
|
||||||
struct SSubplan* plan = NULL;
|
struct SSubplan* plan = NULL;
|
||||||
int32_t code = qStringToSubplan(msg, &plan);
|
int32_t code = qStringToSubplan(msg, &plan);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -2100,6 +2100,7 @@ static const char* jkSelectStmtHaving = "Having";
|
||||||
static const char* jkSelectStmtOrderBy = "OrderBy";
|
static const char* jkSelectStmtOrderBy = "OrderBy";
|
||||||
static const char* jkSelectStmtLimit = "Limit";
|
static const char* jkSelectStmtLimit = "Limit";
|
||||||
static const char* jkSelectStmtSlimit = "Slimit";
|
static const char* jkSelectStmtSlimit = "Slimit";
|
||||||
|
static const char* jkSelectStmtStmtName = "StmtName";
|
||||||
|
|
||||||
static int32_t selectStmtTojson(const void* pObj, SJson* pJson) {
|
static int32_t selectStmtTojson(const void* pObj, SJson* pJson) {
|
||||||
const SSelectStmt* pNode = (const SSelectStmt*)pObj;
|
const SSelectStmt* pNode = (const SSelectStmt*)pObj;
|
||||||
|
@ -2135,6 +2136,9 @@ static int32_t selectStmtTojson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddObject(pJson, jkSelectStmtSlimit, nodeToJson, pNode->pSlimit);
|
code = tjsonAddObject(pJson, jkSelectStmtSlimit, nodeToJson, pNode->pSlimit);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddStringToObject(pJson, jkSelectStmtStmtName, pNode->stmtName);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2173,6 +2177,9 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = jsonToNodeObject(pJson, jkSelectStmtSlimit, &pNode->pSlimit);
|
code = jsonToNodeObject(pJson, jkSelectStmtSlimit, &pNode->pSlimit);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetStringValue(pJson, jkSelectStmtStmtName, pNode->stmtName);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1800,7 +1800,7 @@ static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
|
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) {
|
||||||
if (QUERY_NODE_SELECT_STMT == nodeType(pRoot)) {
|
if (NULL != pRoot && QUERY_NODE_SELECT_STMT == nodeType(pRoot)) {
|
||||||
SSelectStmt* pSelect = (SSelectStmt*) pRoot;
|
SSelectStmt* pSelect = (SSelectStmt*) pRoot;
|
||||||
*numOfCols = LIST_LENGTH(pSelect->pProjectionList);
|
*numOfCols = LIST_LENGTH(pSelect->pProjectionList);
|
||||||
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
|
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
|
||||||
|
@ -2400,13 +2400,14 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
switch (nodeType(pQuery->pRoot)) {
|
switch (nodeType(pQuery->pRoot)) {
|
||||||
case QUERY_NODE_SELECT_STMT:
|
case QUERY_NODE_SELECT_STMT:
|
||||||
pQuery->haveResultSet = true;
|
pQuery->haveResultSet = true;
|
||||||
pQuery->directRpc = false;
|
pQuery->directRpc = false;
|
||||||
pQuery->msgType = TDMT_VND_QUERY;
|
pQuery->msgType = TDMT_VND_QUERY;
|
||||||
code = qExtractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema);
|
if (TSDB_CODE_SUCCESS != qExtractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema)) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_VNODE_MODIF_STMT:
|
case QUERY_NODE_VNODE_MODIF_STMT:
|
||||||
pQuery->haveResultSet = false;
|
pQuery->haveResultSet = false;
|
||||||
|
@ -2446,7 +2447,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) {
|
int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) {
|
||||||
|
|
Loading…
Reference in New Issue