diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 5aa22bdcad..a936ea3b54 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -27,7 +27,7 @@ extern "C" { typedef int32_t VarDataOffsetT; typedef uint32_t TDRowLenT; typedef uint8_t TDRowValT; -typedef uint16_t col_id_t; +typedef int16_t col_id_t; typedef int8_t col_type_t; #pragma pack(push, 1) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 39ecd89619..18417cb608 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -166,6 +166,7 @@ typedef struct SScanPhysiNode { typedef SScanPhysiNode SSystemTableScanPhysiNode; typedef SScanPhysiNode STagScanPhysiNode; +typedef SScanPhysiNode SStreamScanPhysiNode; typedef struct STableScanPhysiNode { SScanPhysiNode scan; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index fe44d8666b..a55a0e218d 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -23,6 +23,9 @@ extern "C" { #include "nodes.h" #include "tmsg.h" +#define TABLE_META_SIZE(pMeta) (NULL == (pMeta) ? 0 : (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags) * sizeof(SSchema))) +#define VGROUPS_INFO_SIZE(pInfo) (NULL == (pInfo) ? 0 : (sizeof(SVgroupsInfo) + (pInfo)->numOfVgroups * sizeof(SVgroupInfo))) + typedef struct SRawExprNode { ENodeType nodeType; char* p; diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 77838c705f..23bcdabb1b 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -26,6 +26,7 @@ typedef struct SParseContext { uint64_t requestId; int32_t acctId; const char *db; + bool streamQuery; void *pTransporter; SEpSet mgmtEpSet; const char *pSql; // sql string diff --git a/include/util/tjson.h b/include/util/tjson.h index 1218caae35..b27f3b93ac 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -44,6 +44,7 @@ int32_t tjsonGetIntValue(const SJson* pJson, const char* pName, int32_t* pVal); int32_t tjsonGetSmallIntValue(const SJson* pJson, const char* pName, int16_t* pVal); int32_t tjsonGetTinyIntValue(const SJson* pJson, const char* pName, int8_t* pVal); int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pVal); +int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal); int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal); int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal); int32_t tjsonGetDoubleValue(const SJson* pJson, const char* pName, double* pVal); @@ -60,6 +61,7 @@ int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* typedef int32_t (*FToObject)(const SJson* pJson, void* pObj); int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, void* pObj); +int32_t tjsonMakeObject(const SJson* pJson, const char* pName, FToObject func, void** pObj, int32_t objSize); int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize); char* tjsonToString(const SJson* pJson); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b672c9bb47..e341729a8f 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -179,6 +179,7 @@ typedef struct SRequestObj { uint64_t requestId; int32_t type; // request type STscObj* pTscObj; + char* pDb; char* sqlstr; // sql string int32_t sqlLen; int64_t self; @@ -229,7 +230,7 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); -int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery); +int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery); int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); // --- heartbeat diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 525c5f9fb8..2233742625 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -150,6 +150,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty return NULL; } + pRequest->pDb = getDbOfConnection(pObj); pRequest->requestId = generateRequestId(); pRequest->metric.start = taosGetTimestampMs(); @@ -180,6 +181,7 @@ static void doDestroyRequest(void *p) { tfree(pRequest->msgBuf); tfree(pRequest->sqlstr); tfree(pRequest->pInfo); + tfree(pRequest->pDb); doFreeReqResultInfo(&pRequest->body.resInfo); qDestroyQueryPlan(pRequest->body.pDag); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index fc97f4113c..aa94ed42fd 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -137,13 +137,14 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj* return TSDB_CODE_SUCCESS; } -int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) { +int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery) { STscObj* pTscObj = pRequest->pTscObj; SParseContext cxt = { .requestId = pRequest->requestId, .acctId = pTscObj->acctId, - .db = getDbOfConnection(pTscObj), + .db = pRequest->pDb, + .streamQuery = streamQuery, .pSql = pRequest->sqlstr, .sqlLen = pRequest->sqlLen, .pMsg = pRequest->msgBuf, @@ -154,7 +155,6 @@ int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) { cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog); if (code != TSDB_CODE_SUCCESS) { - tfree(cxt.db); return code; } @@ -163,7 +163,6 @@ int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) { setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols); } - tfree(cxt.db); return code; } @@ -249,7 +248,7 @@ TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) { terrno = TSDB_CODE_SUCCESS; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); - CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); + CHECK_CODE_GOTO(parseSql(pRequest, false, &pQuery), _return); if (pQuery->directRpc) { CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index ee13527b96..2aebd67c56 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -482,38 +482,24 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i } tscDebug("start to create topic, %s", topicName); -#if 0 - CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); - CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return); - pQueryNode->streamQuery = true; + CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); + CHECK_CODE_GOTO(parseSql(pRequest, true, &pQueryNode), _return); // todo check for invalid sql statement and return with error code - SSchema* schema = NULL; - int32_t numOfCols = 0; - CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, NULL), _return); - - pStr = qQueryPlanToString(pRequest->body.pDag); - if (pStr == NULL) { - goto _return; - } + CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &pStr, NULL), _return); /*printf("%s\n", pStr);*/ - // The topic should be related to a database that the queried table is belonged to. - SName name = {0}; - char dbName[TSDB_DB_FNAME_LEN] = {0}; - // tNameGetFullDbName(&((SQueryStmtInfo*)pQueryNode)->pTableMetaInfo[0]->name, dbName); - - tNameFromString(&name, dbName, T_NAME_ACCT | T_NAME_DB); - tNameFromString(&name, topicName, T_NAME_TABLE); + SName name = { .acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T }; + strcpy(name.dbname, pRequest->pDb); + strcpy(name.tname, topicName); SCMCreateTopicReq req = { .igExists = 1, - .physicalPlan = (char*)pStr, + .ast = (char*)pStr, .sql = (char*)sql, - .logicalPlan = (char*)"no logic plan", }; tNameExtractFullName(&name, req.name); @@ -536,7 +522,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); tsem_wait(&pRequest->body.rspSem); -#endif + _return: qDestroyQuery(pQueryNode); /*if (sendInfo != NULL) {*/ diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 4b649d5f62..31328cc4b2 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1072,6 +1072,10 @@ void blockDataClearup(SSDataBlock* pDataBlock) { } int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) { + if (0 == numOfRows) { + return TSDB_CODE_SUCCESS; + } + if (IS_VAR_DATA_TYPE(pColumn->info.type)) { char* tmp = realloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows); if (tmp == NULL) { @@ -1092,7 +1096,7 @@ int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRo pColumn->nullbitmap = tmp; memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows)); - + assert(pColumn->info.bytes); tmp = realloc(pColumn->pData, numOfRows * pColumn->info.bytes); if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -1137,7 +1141,7 @@ void* blockDataDestroy(SSDataBlock* pBlock) { taosArrayDestroy(pBlock->pDataBlock); tfree(pBlock->pBlockAgg); - tfree(pBlock); + // tfree(pBlock); return NULL; } @@ -1190,7 +1194,7 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { } int32_t len = colDataGetLength(pColData, rows); - taosEncodeFixedI32(buf, len); + tlen += taosEncodeFixedI32(buf, len); tlen += taosEncodeBinary(buf, pColData->pData, len); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index a2342ec85a..0c4b933c19 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "tdatablock.h" #include "vnode.h" STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { @@ -128,10 +129,13 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { int j = 0; for (int32_t i = 0; i < colNumNeed; i++) { - int32_t colId = *(int32_t*)taosArrayGet(pHandle->pColIdList, i); + int16_t colId = *(int16_t*)taosArrayGet(pHandle->pColIdList, i); while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) { j++; } + if (j >= pSchemaWrapper->nCols) { + continue; + } SSchema* pColSchema = &pSchemaWrapper->pSchema[j]; SColumnInfoData colInfo = {0}; int sz = numOfRows * pColSchema->bytes; @@ -145,6 +149,8 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { taosArrayDestroy(pArray); return NULL; } + + blockDataEnsureColumnCapacity(&colInfo, numOfRows); taosArrayPush(pArray, &colInfo); } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index d1dc2dec58..d66203eb40 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -197,7 +197,7 @@ static SNode* logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) { } static STableMeta* tableMetaClone(const STableMeta* pSrc) { - int32_t len = sizeof(STableMeta) + (pSrc->tableInfo.numOfTags + pSrc->tableInfo.numOfColumns) * sizeof(SSchema); + int32_t len = TABLE_META_SIZE(pSrc); STableMeta* pDst = malloc(len); if (NULL == pDst) { return NULL; @@ -207,7 +207,7 @@ static STableMeta* tableMetaClone(const STableMeta* pSrc) { } static SVgroupsInfo* vgroupsInfoClone(const SVgroupsInfo* pSrc) { - int32_t len = sizeof(SVgroupsInfo) + pSrc->numOfVgroups * sizeof(SVgroupInfo); + int32_t len = VGROUPS_INFO_SIZE(pSrc); SVgroupsInfo* pDst = malloc(len); if (NULL == pDst) { return NULL; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 44f2318cc4..c61a2d71fb 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -193,6 +193,17 @@ static int32_t tableMetaToJson(const void* pObj, SJson* pJson) { return code; } +static int32_t jsonToTableMeta(const SJson* pJson, void* pObj) { + STableMeta* pNode = (STableMeta*)pObj; + + int32_t code = tjsonGetUBigIntValue(pJson, jkTableMetaUid, &pNode->uid); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUBigIntValue(pJson, jkTableMetaSuid, &pNode->suid); + } + + return code; +} + static const char* jkLogicPlanTargets = "Targets"; static const char* jkLogicPlanConditions = "Conditions"; static const char* jkLogicPlanChildren = "Children"; @@ -441,6 +452,14 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { return code; } +static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) { + return physiScanNodeToJson(pObj, pJson); +} + +static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) { + return jsonToPhysiScanNode(pJson, pObj); +} + static const char* jkProjectPhysiPlanProjections = "Projections"; static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) { @@ -1269,6 +1288,193 @@ static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkTableDbName = "DbName"; +static const char* jkTableTableName = "tableName"; +static const char* jkTableTableAlias = "tableAlias"; + +static int32_t tableNodeToJson(const void* pObj, SJson* pJson) { + const STableNode* pNode = (const STableNode*)pObj; + + int32_t code = exprNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddStringToObject(pJson, jkTableDbName, pNode->dbName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddStringToObject(pJson, jkTableTableName, pNode->tableName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddStringToObject(pJson, jkTableTableAlias, pNode->tableAlias); + } + + return code; +} + +static int32_t jsonToTableNode(const SJson* pJson, void* pObj) { + STableNode* pNode = (STableNode*)pObj; + + int32_t code = jsonToExprNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkTableDbName, pNode->dbName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkTableTableName, pNode->tableName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkTableTableAlias, pNode->tableAlias); + } + + return code; +} + +static const char* jkEpSetInUse = "InUse"; +static const char* jkEpSetNumOfEps = "NumOfEps"; +static const char* jkEpSetEps = "Eps"; + +static int32_t epSetToJson(const void* pObj, SJson* pJson) { + const SEpSet* pNode = (const SEpSet*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkEpSetInUse, pNode->inUse); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkEpSetNumOfEps, pNode->numOfEps); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddArray(pJson, jkEpSetEps, epToJson, pNode->eps, sizeof(SEp), pNode->numOfEps); + } + + return code; +} + +static int32_t jsonToEpSet(const SJson* pJson, void* pObj) { + SEpSet* pNode = (SEpSet*)pObj; + + int32_t code = tjsonGetTinyIntValue(pJson, jkEpSetInUse, &pNode->inUse); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkEpSetNumOfEps, &pNode->numOfEps); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToArray(pJson, jkEpSetEps, jsonToEp, pNode->eps, sizeof(SEp)); + } + + return code; +} + +static const char* jkVgroupInfoVgId = "VgId"; +static const char* jkVgroupInfoHashBegin = "HashBegin"; +static const char* jkVgroupInfoHashEnd = "HashEnd"; +static const char* jkVgroupInfoEpSet = "EpSet"; +static const char* jkVgroupInfoNumOfTable = "NumOfTable"; + +static int32_t vgroupInfoToJson(const void* pObj, SJson* pJson) { + const SVgroupInfo* pNode = (const SVgroupInfo*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkVgroupInfoVgId, pNode->vgId); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkVgroupInfoHashBegin, pNode->hashBegin); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkVgroupInfoHashEnd, pNode->hashEnd); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkVgroupInfoEpSet, epSetToJson, &pNode->epSet); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkVgroupInfoNumOfTable, pNode->numOfTable); + } + + return code; +} + +static int32_t jsonToVgroupInfo(const SJson* pJson, void* pObj) { + SVgroupInfo* pNode = (SVgroupInfo*)pObj; + + int32_t code = tjsonGetIntValue(pJson, jkVgroupInfoVgId, &pNode->vgId); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUIntValue(pJson, jkVgroupInfoHashBegin, &pNode->hashBegin); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUIntValue(pJson, jkVgroupInfoHashEnd, &pNode->hashEnd); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, jkVgroupInfoEpSet, jsonToEpSet, &pNode->epSet); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkVgroupInfoNumOfTable, &pNode->numOfTable); + } + + return code; +} + +static const char* jkVgroupsInfoNum = "Num"; +static const char* jkVgroupsInfoVgroups = "Vgroups"; + +static int32_t vgroupsInfoToJson(const void* pObj, SJson* pJson) { + const SVgroupsInfo* pNode = (const SVgroupsInfo*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkVgroupsInfoNum, pNode->numOfVgroups); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddArray(pJson, jkVgroupsInfoVgroups, vgroupInfoToJson, pNode->vgroups, sizeof(SVgroupInfo), pNode->numOfVgroups); + } + + return code; +} + +static int32_t jsonToVgroupsInfo(const SJson* pJson, void* pObj) { + SVgroupsInfo* pNode = (SVgroupsInfo*)pObj; + + int32_t code = tjsonGetIntValue(pJson, jkVgroupsInfoNum, &pNode->numOfVgroups); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToArray(pJson, jkVgroupsInfoVgroups, jsonToVgroupInfo, pNode->vgroups, sizeof(SVgroupInfo)); + } + + return code; +} + +static const char* jkRealTableMetaSize = "MetaSize"; +static const char* jkRealTableMeta = "Meta"; +static const char* jkRealTableVgroupsInfoSize = "VgroupsInfoSize"; +static const char* jkRealTableVgroupsInfo = "VgroupsInfo"; + +static int32_t realTableNodeToJson(const void* pObj, SJson* pJson) { + const SRealTableNode* pNode = (const SRealTableNode*)pObj; + + int32_t code = tableNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkRealTableMetaSize, TABLE_META_SIZE(pNode->pMeta)); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkRealTableMeta, tableMetaToJson, pNode->pMeta); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkRealTableVgroupsInfoSize, VGROUPS_INFO_SIZE(pNode->pVgroupList)); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkRealTableVgroupsInfo, vgroupsInfoToJson, pNode->pVgroupList); + } + + return code; +} + +static int32_t jsonToRealTableNode(const SJson* pJson, void* pObj) { + SRealTableNode* pNode = (SRealTableNode*)pObj; + + int32_t objSize = 0; + int32_t code = jsonToTableNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkRealTableMetaSize, &objSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonMakeObject(pJson, jkRealTableMeta, jsonToTableMeta, (void**)&pNode->pMeta, objSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkRealTableVgroupsInfoSize, &objSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonMakeObject(pJson, jkRealTableVgroupsInfo, jsonToVgroupsInfo, (void**)&pNode->pVgroupList, objSize); + } + + return code; +} + static const char* jkGroupingSetType = "GroupingSetType"; static const char* jkGroupingSetParameter = "Parameters"; @@ -1460,7 +1666,7 @@ static const char* jkSelectStmtSlimit = "Slimit"; static int32_t selectStmtTojson(const void* pObj, SJson* pJson) { const SSelectStmt* pNode = (const SSelectStmt*)pObj; - int32_t code = tjsonAddIntegerToObject(pJson, jkSelectStmtDistinct, pNode->isDistinct); + int32_t code = tjsonAddBoolToObject(pJson, jkSelectStmtDistinct, pNode->isDistinct); if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkSelectStmtProjections, pNode->pProjectionList); } @@ -1495,6 +1701,44 @@ static int32_t selectStmtTojson(const void* pObj, SJson* pJson) { return code; } +static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) { + SSelectStmt* pNode = (SSelectStmt*)pObj; + + int32_t code = tjsonGetBoolValue(pJson, jkSelectStmtDistinct, &pNode->isDistinct); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkSelectStmtProjections, &pNode->pProjectionList); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSelectStmtFrom, &pNode->pFromTable); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSelectStmtWhere, &pNode->pWhere); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkSelectStmtPartitionBy, &pNode->pPartitionByList); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSelectStmtWindow, &pNode->pWindow); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkSelectStmtGroupBy, &pNode->pGroupByList); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSelectStmtHaving, &pNode->pHaving); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkSelectStmtOrderBy, &pNode->pOrderByList); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSelectStmtLimit, &pNode->pLimit); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSelectStmtSlimit, &pNode->pSlimit); + } + + return code; +} + static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { switch (nodeType(pObj)) { case QUERY_NODE_COLUMN: @@ -1508,6 +1752,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_FUNCTION: return functionNodeToJson(pObj, pJson); case QUERY_NODE_REAL_TABLE: + return realTableNodeToJson(pObj, pJson); case QUERY_NODE_TEMP_TABLE: case QUERY_NODE_JOIN_TABLE: break; @@ -1561,9 +1806,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return physiTagScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: return physiTableScanNodeToJson(pObj, pJson); - case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: - break; + return physiStreamScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return physiProjectNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_JOIN: @@ -1585,6 +1829,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN: return planToJson(pObj, pJson); default: + assert(0); break; } nodesWarn("specificNodeToJson unknown node = %s", nodesNodeName(nodeType(pObj))); @@ -1603,7 +1848,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToLogicConditionNode(pJson, pObj); case QUERY_NODE_FUNCTION: return jsonToFunctionNode(pJson, pObj); - // case QUERY_NODE_REAL_TABLE: + case QUERY_NODE_REAL_TABLE: + return jsonToRealTableNode(pJson, pObj); // case QUERY_NODE_TEMP_TABLE: // case QUERY_NODE_JOIN_TABLE: // break; @@ -1629,8 +1875,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToDownstreamSourceNode(pJson, pObj); // case QUERY_NODE_SET_OPERATOR: // break; - // case QUERY_NODE_SELECT_STMT: - // return jsonToSelectStmt(pJson, pObj); + case QUERY_NODE_SELECT_STMT: + return jsonToSelectStmt(pJson, pObj); // case QUERY_NODE_LOGIC_PLAN_SCAN: // return jsonToLogicScanNode(pJson, pObj); // case QUERY_NODE_LOGIC_PLAN_JOIN: @@ -1643,6 +1889,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToPhysiTagScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: return jsonToPhysiTableScanNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: + return jsonToPhysiStreamScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return jsonToPhysiProjectNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_JOIN: @@ -1764,6 +2012,7 @@ int32_t nodesStringToNode(const char* pStr, SNode** pNode) { int32_t code = makeNodeByJson(pJson, pNode); if (TSDB_CODE_SUCCESS != code) { nodesDestroyNode(*pNode); + *pNode = NULL; terrno = code; return code; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 48b9c3e094..0d48202f73 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -170,7 +170,7 @@ SNodeptr nodesMakeNode(ENodeType type) { case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: return makeNode(type, sizeof(STableSeqScanPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: - return makeNode(type, sizeof(SNode)); + return makeNode(type, sizeof(SStreamScanPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return makeNode(type, sizeof(SProjectPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_JOIN: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 03206e89cb..171ec81ca2 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -126,7 +126,7 @@ static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SCol static int32_t createColumnNodeByTable(STranslateContext* pCxt, const STableNode* pTable, SNodeList* pList) { if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) { const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta; - int32_t nums = pMeta->tableInfo.numOfColumns + ((TSDB_SUPER_TABLE == pMeta->tableType)? pMeta->tableInfo.numOfTags:0); + int32_t nums = pMeta->tableInfo.numOfColumns + ((TSDB_SUPER_TABLE == pMeta->tableType) ? pMeta->tableInfo.numOfTags : 0); for (int32_t i = 0; i < nums; ++i) { SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { @@ -499,6 +499,10 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) } static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNode* pRealTable) { + if (pCxt->streamQuery) { + return TSDB_CODE_SUCCESS; + } + if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) { SArray* vgroupList = NULL; int32_t code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, name, &vgroupList); @@ -1389,6 +1393,7 @@ static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* p SCMCreateTopicReq createReq = {0}; if (NULL != pStmt->pQuery) { + pCxt->pParseCxt->streamQuery = true; int32_t code = translateQuery(pCxt, pStmt->pQuery); if (TSDB_CODE_SUCCESS == code) { code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index fd03eacf3f..93615b0565 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -131,7 +131,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*); TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*); - pScan->scanType = SCAN_TYPE_TABLE; + pScan->scanType = pCxt->pPlanCxt->streamQuery ? SCAN_TYPE_STREAM : SCAN_TYPE_TABLE; pScan->scanFlag = MAIN_SCAN; pScan->scanRange = TSWINDOW_INITIALIZER; pScan->tableName.type = TSDB_TABLE_NAME_T; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 0affd93f4d..9bd926b676 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -259,15 +259,21 @@ static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* p return (SPhysiNode*)pTableScan; } +static SPhysiNode* createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) { + SStreamScanPhysiNode* pTableScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); + CHECK_ALLOC(pTableScan, NULL); + CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan), (SPhysiNode*)pTableScan); + return (SPhysiNode*)pTableScan; +} + static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) { switch (pScanLogicNode->scanType) { case SCAN_TYPE_TAG: return createTagScanPhysiNode(pCxt, pScanLogicNode); case SCAN_TYPE_TABLE: return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode); - case SCAN_TYPE_STABLE: case SCAN_TYPE_STREAM: - break; + return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode); default: break; } diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index 0a4a1a07a6..a85da8cbbf 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -202,9 +202,16 @@ int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pV return (errno == ERANGE||errno == EINVAL) ? TSDB_CODE_FAILED:TSDB_CODE_SUCCESS; } +int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal) { + uint64_t val = 0; + int32_t code = tjsonGetUBigIntValue(pJson, pName, &val); + *pVal = val; + return code; +} + int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal) { uint64_t val = 0; - int32_t code = tjsonGetUBigIntValue(pJson, pName, &val); + int32_t code = tjsonGetUBigIntValue(pJson, pName, &val); *pVal = val; return code; } @@ -239,6 +246,22 @@ int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, voi return func(pJsonObj, pObj); } +int32_t tjsonMakeObject(const SJson* pJson, const char* pName, FToObject func, void** pObj, int32_t objSize) { + if (objSize <= 0) { + return TSDB_CODE_SUCCESS; + } + + SJson* pJsonObj = tjsonGetObjectItem(pJson, pName); + if (NULL == pJsonObj) { + return TSDB_CODE_FAILED; + } + *pObj = calloc(1, objSize); + if (NULL == *pObj) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return func(pJsonObj, *pObj); +} + int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize) { const cJSON* jArray = tjsonGetObjectItem(pJson, pName); int32_t size = (NULL == jArray ? 0 : tjsonGetArraySize(jArray));