tmq plan implement
This commit is contained in:
parent
05f7532ee0
commit
c2b4ec91ff
|
@ -27,7 +27,7 @@ extern "C" {
|
|||
typedef int32_t VarDataOffsetT;
|
||||
typedef uint32_t TDRowLenT;
|
||||
typedef uint8_t TDRowValT;
|
||||
typedef uint16_t col_id_t;
|
||||
typedef int16_t col_id_t;
|
||||
typedef int8_t col_type_t;
|
||||
|
||||
#pragma pack(push, 1)
|
||||
|
|
|
@ -166,6 +166,7 @@ typedef struct SScanPhysiNode {
|
|||
|
||||
typedef SScanPhysiNode SSystemTableScanPhysiNode;
|
||||
typedef SScanPhysiNode STagScanPhysiNode;
|
||||
typedef SScanPhysiNode SStreamScanPhysiNode;
|
||||
|
||||
typedef struct STableScanPhysiNode {
|
||||
SScanPhysiNode scan;
|
||||
|
|
|
@ -23,6 +23,9 @@ extern "C" {
|
|||
#include "nodes.h"
|
||||
#include "tmsg.h"
|
||||
|
||||
#define TABLE_META_SIZE(pMeta) (NULL == (pMeta) ? 0 : (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags) * sizeof(SSchema)))
|
||||
#define VGROUPS_INFO_SIZE(pInfo) (NULL == (pInfo) ? 0 : (sizeof(SVgroupsInfo) + (pInfo)->numOfVgroups * sizeof(SVgroupInfo)))
|
||||
|
||||
typedef struct SRawExprNode {
|
||||
ENodeType nodeType;
|
||||
char* p;
|
||||
|
|
|
@ -26,6 +26,7 @@ typedef struct SParseContext {
|
|||
uint64_t requestId;
|
||||
int32_t acctId;
|
||||
const char *db;
|
||||
bool streamQuery;
|
||||
void *pTransporter;
|
||||
SEpSet mgmtEpSet;
|
||||
const char *pSql; // sql string
|
||||
|
|
|
@ -44,6 +44,7 @@ int32_t tjsonGetIntValue(const SJson* pJson, const char* pName, int32_t* pVal);
|
|||
int32_t tjsonGetSmallIntValue(const SJson* pJson, const char* pName, int16_t* pVal);
|
||||
int32_t tjsonGetTinyIntValue(const SJson* pJson, const char* pName, int8_t* pVal);
|
||||
int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pVal);
|
||||
int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal);
|
||||
int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal);
|
||||
int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal);
|
||||
int32_t tjsonGetDoubleValue(const SJson* pJson, const char* pName, double* pVal);
|
||||
|
@ -60,6 +61,7 @@ int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void*
|
|||
typedef int32_t (*FToObject)(const SJson* pJson, void* pObj);
|
||||
|
||||
int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, void* pObj);
|
||||
int32_t tjsonMakeObject(const SJson* pJson, const char* pName, FToObject func, void** pObj, int32_t objSize);
|
||||
int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize);
|
||||
|
||||
char* tjsonToString(const SJson* pJson);
|
||||
|
|
|
@ -179,6 +179,7 @@ typedef struct SRequestObj {
|
|||
uint64_t requestId;
|
||||
int32_t type; // request type
|
||||
STscObj* pTscObj;
|
||||
char* pDb;
|
||||
char* sqlstr; // sql string
|
||||
int32_t sqlLen;
|
||||
int64_t self;
|
||||
|
@ -229,7 +230,7 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t
|
|||
|
||||
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
|
||||
|
||||
int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery);
|
||||
int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery);
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
|
||||
|
||||
// --- heartbeat
|
||||
|
|
|
@ -150,6 +150,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pRequest->pDb = getDbOfConnection(pObj);
|
||||
pRequest->requestId = generateRequestId();
|
||||
pRequest->metric.start = taosGetTimestampMs();
|
||||
|
||||
|
@ -180,6 +181,7 @@ static void doDestroyRequest(void *p) {
|
|||
tfree(pRequest->msgBuf);
|
||||
tfree(pRequest->sqlstr);
|
||||
tfree(pRequest->pInfo);
|
||||
tfree(pRequest->pDb);
|
||||
|
||||
doFreeReqResultInfo(&pRequest->body.resInfo);
|
||||
qDestroyQueryPlan(pRequest->body.pDag);
|
||||
|
|
|
@ -137,13 +137,14 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) {
|
||||
int32_t parseSql(SRequestObj* pRequest, bool streamQuery, SQuery** pQuery) {
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
|
||||
SParseContext cxt = {
|
||||
.requestId = pRequest->requestId,
|
||||
.acctId = pTscObj->acctId,
|
||||
.db = getDbOfConnection(pTscObj),
|
||||
.db = pRequest->pDb,
|
||||
.streamQuery = streamQuery,
|
||||
.pSql = pRequest->sqlstr,
|
||||
.sqlLen = pRequest->sqlLen,
|
||||
.pMsg = pRequest->msgBuf,
|
||||
|
@ -154,7 +155,6 @@ int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) {
|
|||
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tfree(cxt.db);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -163,7 +163,6 @@ int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) {
|
|||
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
|
||||
}
|
||||
|
||||
tfree(cxt.db);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -249,7 +248,7 @@ TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
|
|||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
|
||||
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQuery), _return);
|
||||
|
||||
if (pQuery->directRpc) {
|
||||
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
|
||||
|
|
|
@ -482,38 +482,24 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
|||
}
|
||||
|
||||
tscDebug("start to create topic, %s", topicName);
|
||||
#if 0
|
||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||
CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
|
||||
|
||||
pQueryNode->streamQuery = true;
|
||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
||||
CHECK_CODE_GOTO(parseSql(pRequest, true, &pQueryNode), _return);
|
||||
|
||||
// todo check for invalid sql statement and return with error code
|
||||
|
||||
SSchema* schema = NULL;
|
||||
int32_t numOfCols = 0;
|
||||
CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, NULL), _return);
|
||||
|
||||
pStr = qQueryPlanToString(pRequest->body.pDag);
|
||||
if (pStr == NULL) {
|
||||
goto _return;
|
||||
}
|
||||
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &pStr, NULL), _return);
|
||||
|
||||
/*printf("%s\n", pStr);*/
|
||||
|
||||
// The topic should be related to a database that the queried table is belonged to.
|
||||
SName name = {0};
|
||||
char dbName[TSDB_DB_FNAME_LEN] = {0};
|
||||
// tNameGetFullDbName(&((SQueryStmtInfo*)pQueryNode)->pTableMetaInfo[0]->name, dbName);
|
||||
|
||||
tNameFromString(&name, dbName, T_NAME_ACCT | T_NAME_DB);
|
||||
tNameFromString(&name, topicName, T_NAME_TABLE);
|
||||
SName name = { .acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T };
|
||||
strcpy(name.dbname, pRequest->pDb);
|
||||
strcpy(name.tname, topicName);
|
||||
|
||||
SCMCreateTopicReq req = {
|
||||
.igExists = 1,
|
||||
.physicalPlan = (char*)pStr,
|
||||
.ast = (char*)pStr,
|
||||
.sql = (char*)sql,
|
||||
.logicalPlan = (char*)"no logic plan",
|
||||
};
|
||||
tNameExtractFullName(&name, req.name);
|
||||
|
||||
|
@ -536,7 +522,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
|||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
#endif
|
||||
|
||||
_return:
|
||||
qDestroyQuery(pQueryNode);
|
||||
/*if (sendInfo != NULL) {*/
|
||||
|
|
|
@ -1072,6 +1072,10 @@ void blockDataClearup(SSDataBlock* pDataBlock) {
|
|||
}
|
||||
|
||||
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
||||
if (0 == numOfRows) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
|
||||
char* tmp = realloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows);
|
||||
if (tmp == NULL) {
|
||||
|
@ -1092,7 +1096,7 @@ int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRo
|
|||
|
||||
pColumn->nullbitmap = tmp;
|
||||
memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
|
||||
|
||||
assert(pColumn->info.bytes);
|
||||
tmp = realloc(pColumn->pData, numOfRows * pColumn->info.bytes);
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1137,7 +1141,7 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
|
|||
|
||||
taosArrayDestroy(pBlock->pDataBlock);
|
||||
tfree(pBlock->pBlockAgg);
|
||||
tfree(pBlock);
|
||||
// tfree(pBlock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -1190,7 +1194,7 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
int32_t len = colDataGetLength(pColData, rows);
|
||||
taosEncodeFixedI32(buf, len);
|
||||
tlen += taosEncodeFixedI32(buf, len);
|
||||
|
||||
tlen += taosEncodeBinary(buf, pColData->pData, len);
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tdatablock.h"
|
||||
#include "vnode.h"
|
||||
|
||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
||||
|
@ -128,10 +129,13 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
|||
|
||||
int j = 0;
|
||||
for (int32_t i = 0; i < colNumNeed; i++) {
|
||||
int32_t colId = *(int32_t*)taosArrayGet(pHandle->pColIdList, i);
|
||||
int16_t colId = *(int16_t*)taosArrayGet(pHandle->pColIdList, i);
|
||||
while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) {
|
||||
j++;
|
||||
}
|
||||
if (j >= pSchemaWrapper->nCols) {
|
||||
continue;
|
||||
}
|
||||
SSchema* pColSchema = &pSchemaWrapper->pSchema[j];
|
||||
SColumnInfoData colInfo = {0};
|
||||
int sz = numOfRows * pColSchema->bytes;
|
||||
|
@ -145,6 +149,8 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
|
|||
taosArrayDestroy(pArray);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
blockDataEnsureColumnCapacity(&colInfo, numOfRows);
|
||||
taosArrayPush(pArray, &colInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -197,7 +197,7 @@ static SNode* logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
|
|||
}
|
||||
|
||||
static STableMeta* tableMetaClone(const STableMeta* pSrc) {
|
||||
int32_t len = sizeof(STableMeta) + (pSrc->tableInfo.numOfTags + pSrc->tableInfo.numOfColumns) * sizeof(SSchema);
|
||||
int32_t len = TABLE_META_SIZE(pSrc);
|
||||
STableMeta* pDst = malloc(len);
|
||||
if (NULL == pDst) {
|
||||
return NULL;
|
||||
|
@ -207,7 +207,7 @@ static STableMeta* tableMetaClone(const STableMeta* pSrc) {
|
|||
}
|
||||
|
||||
static SVgroupsInfo* vgroupsInfoClone(const SVgroupsInfo* pSrc) {
|
||||
int32_t len = sizeof(SVgroupsInfo) + pSrc->numOfVgroups * sizeof(SVgroupInfo);
|
||||
int32_t len = VGROUPS_INFO_SIZE(pSrc);
|
||||
SVgroupsInfo* pDst = malloc(len);
|
||||
if (NULL == pDst) {
|
||||
return NULL;
|
||||
|
|
|
@ -193,6 +193,17 @@ static int32_t tableMetaToJson(const void* pObj, SJson* pJson) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToTableMeta(const SJson* pJson, void* pObj) {
|
||||
STableMeta* pNode = (STableMeta*)pObj;
|
||||
|
||||
int32_t code = tjsonGetUBigIntValue(pJson, jkTableMetaUid, &pNode->uid);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUBigIntValue(pJson, jkTableMetaSuid, &pNode->suid);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkLogicPlanTargets = "Targets";
|
||||
static const char* jkLogicPlanConditions = "Conditions";
|
||||
static const char* jkLogicPlanChildren = "Children";
|
||||
|
@ -441,6 +452,14 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t physiStreamScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
return physiScanNodeToJson(pObj, pJson);
|
||||
}
|
||||
|
||||
static int32_t jsonToPhysiStreamScanNode(const SJson* pJson, void* pObj) {
|
||||
return jsonToPhysiScanNode(pJson, pObj);
|
||||
}
|
||||
|
||||
static const char* jkProjectPhysiPlanProjections = "Projections";
|
||||
|
||||
static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
|
||||
|
@ -1269,6 +1288,193 @@ static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkTableDbName = "DbName";
|
||||
static const char* jkTableTableName = "tableName";
|
||||
static const char* jkTableTableAlias = "tableAlias";
|
||||
|
||||
static int32_t tableNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STableNode* pNode = (const STableNode*)pObj;
|
||||
|
||||
int32_t code = exprNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddStringToObject(pJson, jkTableDbName, pNode->dbName);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddStringToObject(pJson, jkTableTableName, pNode->tableName);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddStringToObject(pJson, jkTableTableAlias, pNode->tableAlias);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToTableNode(const SJson* pJson, void* pObj) {
|
||||
STableNode* pNode = (STableNode*)pObj;
|
||||
|
||||
int32_t code = jsonToExprNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetStringValue(pJson, jkTableDbName, pNode->dbName);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetStringValue(pJson, jkTableTableName, pNode->tableName);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetStringValue(pJson, jkTableTableAlias, pNode->tableAlias);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkEpSetInUse = "InUse";
|
||||
static const char* jkEpSetNumOfEps = "NumOfEps";
|
||||
static const char* jkEpSetEps = "Eps";
|
||||
|
||||
static int32_t epSetToJson(const void* pObj, SJson* pJson) {
|
||||
const SEpSet* pNode = (const SEpSet*)pObj;
|
||||
|
||||
int32_t code = tjsonAddIntegerToObject(pJson, jkEpSetInUse, pNode->inUse);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkEpSetNumOfEps, pNode->numOfEps);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddArray(pJson, jkEpSetEps, epToJson, pNode->eps, sizeof(SEp), pNode->numOfEps);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToEpSet(const SJson* pJson, void* pObj) {
|
||||
SEpSet* pNode = (SEpSet*)pObj;
|
||||
|
||||
int32_t code = tjsonGetTinyIntValue(pJson, jkEpSetInUse, &pNode->inUse);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetTinyIntValue(pJson, jkEpSetNumOfEps, &pNode->numOfEps);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonToArray(pJson, jkEpSetEps, jsonToEp, pNode->eps, sizeof(SEp));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkVgroupInfoVgId = "VgId";
|
||||
static const char* jkVgroupInfoHashBegin = "HashBegin";
|
||||
static const char* jkVgroupInfoHashEnd = "HashEnd";
|
||||
static const char* jkVgroupInfoEpSet = "EpSet";
|
||||
static const char* jkVgroupInfoNumOfTable = "NumOfTable";
|
||||
|
||||
static int32_t vgroupInfoToJson(const void* pObj, SJson* pJson) {
|
||||
const SVgroupInfo* pNode = (const SVgroupInfo*)pObj;
|
||||
|
||||
int32_t code = tjsonAddIntegerToObject(pJson, jkVgroupInfoVgId, pNode->vgId);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkVgroupInfoHashBegin, pNode->hashBegin);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkVgroupInfoHashEnd, pNode->hashEnd);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkVgroupInfoEpSet, epSetToJson, &pNode->epSet);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkVgroupInfoNumOfTable, pNode->numOfTable);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToVgroupInfo(const SJson* pJson, void* pObj) {
|
||||
SVgroupInfo* pNode = (SVgroupInfo*)pObj;
|
||||
|
||||
int32_t code = tjsonGetIntValue(pJson, jkVgroupInfoVgId, &pNode->vgId);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUIntValue(pJson, jkVgroupInfoHashBegin, &pNode->hashBegin);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUIntValue(pJson, jkVgroupInfoHashEnd, &pNode->hashEnd);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonToObject(pJson, jkVgroupInfoEpSet, jsonToEpSet, &pNode->epSet);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkVgroupInfoNumOfTable, &pNode->numOfTable);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkVgroupsInfoNum = "Num";
|
||||
static const char* jkVgroupsInfoVgroups = "Vgroups";
|
||||
|
||||
static int32_t vgroupsInfoToJson(const void* pObj, SJson* pJson) {
|
||||
const SVgroupsInfo* pNode = (const SVgroupsInfo*)pObj;
|
||||
|
||||
int32_t code = tjsonAddIntegerToObject(pJson, jkVgroupsInfoNum, pNode->numOfVgroups);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddArray(pJson, jkVgroupsInfoVgroups, vgroupInfoToJson, pNode->vgroups, sizeof(SVgroupInfo), pNode->numOfVgroups);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToVgroupsInfo(const SJson* pJson, void* pObj) {
|
||||
SVgroupsInfo* pNode = (SVgroupsInfo*)pObj;
|
||||
|
||||
int32_t code = tjsonGetIntValue(pJson, jkVgroupsInfoNum, &pNode->numOfVgroups);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonToArray(pJson, jkVgroupsInfoVgroups, jsonToVgroupInfo, pNode->vgroups, sizeof(SVgroupInfo));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkRealTableMetaSize = "MetaSize";
|
||||
static const char* jkRealTableMeta = "Meta";
|
||||
static const char* jkRealTableVgroupsInfoSize = "VgroupsInfoSize";
|
||||
static const char* jkRealTableVgroupsInfo = "VgroupsInfo";
|
||||
|
||||
static int32_t realTableNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SRealTableNode* pNode = (const SRealTableNode*)pObj;
|
||||
|
||||
int32_t code = tableNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkRealTableMetaSize, TABLE_META_SIZE(pNode->pMeta));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkRealTableMeta, tableMetaToJson, pNode->pMeta);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkRealTableVgroupsInfoSize, VGROUPS_INFO_SIZE(pNode->pVgroupList));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkRealTableVgroupsInfo, vgroupsInfoToJson, pNode->pVgroupList);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToRealTableNode(const SJson* pJson, void* pObj) {
|
||||
SRealTableNode* pNode = (SRealTableNode*)pObj;
|
||||
|
||||
int32_t objSize = 0;
|
||||
int32_t code = jsonToTableNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkRealTableMetaSize, &objSize);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonMakeObject(pJson, jkRealTableMeta, jsonToTableMeta, (void**)&pNode->pMeta, objSize);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkRealTableVgroupsInfoSize, &objSize);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonMakeObject(pJson, jkRealTableVgroupsInfo, jsonToVgroupsInfo, (void**)&pNode->pVgroupList, objSize);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkGroupingSetType = "GroupingSetType";
|
||||
static const char* jkGroupingSetParameter = "Parameters";
|
||||
|
||||
|
@ -1460,7 +1666,7 @@ static const char* jkSelectStmtSlimit = "Slimit";
|
|||
static int32_t selectStmtTojson(const void* pObj, SJson* pJson) {
|
||||
const SSelectStmt* pNode = (const SSelectStmt*)pObj;
|
||||
|
||||
int32_t code = tjsonAddIntegerToObject(pJson, jkSelectStmtDistinct, pNode->isDistinct);
|
||||
int32_t code = tjsonAddBoolToObject(pJson, jkSelectStmtDistinct, pNode->isDistinct);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkSelectStmtProjections, pNode->pProjectionList);
|
||||
}
|
||||
|
@ -1495,6 +1701,44 @@ static int32_t selectStmtTojson(const void* pObj, SJson* pJson) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) {
|
||||
SSelectStmt* pNode = (SSelectStmt*)pObj;
|
||||
|
||||
int32_t code = tjsonGetBoolValue(pJson, jkSelectStmtDistinct, &pNode->isDistinct);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkSelectStmtProjections, &pNode->pProjectionList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkSelectStmtFrom, &pNode->pFromTable);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkSelectStmtWhere, &pNode->pWhere);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkSelectStmtPartitionBy, &pNode->pPartitionByList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkSelectStmtWindow, &pNode->pWindow);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkSelectStmtGroupBy, &pNode->pGroupByList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkSelectStmtHaving, &pNode->pHaving);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkSelectStmtOrderBy, &pNode->pOrderByList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkSelectStmtLimit, &pNode->pLimit);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkSelectStmtSlimit, &pNode->pSlimit);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
||||
switch (nodeType(pObj)) {
|
||||
case QUERY_NODE_COLUMN:
|
||||
|
@ -1508,6 +1752,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
case QUERY_NODE_FUNCTION:
|
||||
return functionNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_REAL_TABLE:
|
||||
return realTableNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_TEMP_TABLE:
|
||||
case QUERY_NODE_JOIN_TABLE:
|
||||
break;
|
||||
|
@ -1561,9 +1806,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
return physiTagScanNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
return physiTableScanNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
|
||||
break;
|
||||
return physiStreamScanNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return physiProjectNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
|
||||
|
@ -1585,6 +1829,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN:
|
||||
return planToJson(pObj, pJson);
|
||||
default:
|
||||
assert(0);
|
||||
break;
|
||||
}
|
||||
nodesWarn("specificNodeToJson unknown node = %s", nodesNodeName(nodeType(pObj)));
|
||||
|
@ -1603,7 +1848,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToLogicConditionNode(pJson, pObj);
|
||||
case QUERY_NODE_FUNCTION:
|
||||
return jsonToFunctionNode(pJson, pObj);
|
||||
// case QUERY_NODE_REAL_TABLE:
|
||||
case QUERY_NODE_REAL_TABLE:
|
||||
return jsonToRealTableNode(pJson, pObj);
|
||||
// case QUERY_NODE_TEMP_TABLE:
|
||||
// case QUERY_NODE_JOIN_TABLE:
|
||||
// break;
|
||||
|
@ -1629,8 +1875,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToDownstreamSourceNode(pJson, pObj);
|
||||
// case QUERY_NODE_SET_OPERATOR:
|
||||
// break;
|
||||
// case QUERY_NODE_SELECT_STMT:
|
||||
// return jsonToSelectStmt(pJson, pObj);
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
return jsonToSelectStmt(pJson, pObj);
|
||||
// case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
// return jsonToLogicScanNode(pJson, pObj);
|
||||
// case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||
|
@ -1643,6 +1889,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToPhysiTagScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
return jsonToPhysiTableScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
|
||||
return jsonToPhysiStreamScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return jsonToPhysiProjectNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
|
||||
|
@ -1764,6 +2012,7 @@ int32_t nodesStringToNode(const char* pStr, SNode** pNode) {
|
|||
int32_t code = makeNodeByJson(pJson, pNode);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode(*pNode);
|
||||
*pNode = NULL;
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -170,7 +170,7 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
|
||||
return makeNode(type, sizeof(STableSeqScanPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
|
||||
return makeNode(type, sizeof(SNode));
|
||||
return makeNode(type, sizeof(SStreamScanPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return makeNode(type, sizeof(SProjectPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
|
||||
|
|
|
@ -126,7 +126,7 @@ static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SCol
|
|||
static int32_t createColumnNodeByTable(STranslateContext* pCxt, const STableNode* pTable, SNodeList* pList) {
|
||||
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
|
||||
const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta;
|
||||
int32_t nums = pMeta->tableInfo.numOfColumns + ((TSDB_SUPER_TABLE == pMeta->tableType)? pMeta->tableInfo.numOfTags:0);
|
||||
int32_t nums = pMeta->tableInfo.numOfColumns + ((TSDB_SUPER_TABLE == pMeta->tableType) ? pMeta->tableInfo.numOfTags : 0);
|
||||
for (int32_t i = 0; i < nums; ++i) {
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
|
@ -499,6 +499,10 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
|
|||
}
|
||||
|
||||
static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNode* pRealTable) {
|
||||
if (pCxt->streamQuery) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) {
|
||||
SArray* vgroupList = NULL;
|
||||
int32_t code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, name, &vgroupList);
|
||||
|
@ -1389,6 +1393,7 @@ static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* p
|
|||
SCMCreateTopicReq createReq = {0};
|
||||
|
||||
if (NULL != pStmt->pQuery) {
|
||||
pCxt->pParseCxt->streamQuery = true;
|
||||
int32_t code = translateQuery(pCxt, pStmt->pQuery);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL);
|
||||
|
|
|
@ -131,7 +131,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
|
||||
TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*);
|
||||
TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*);
|
||||
pScan->scanType = SCAN_TYPE_TABLE;
|
||||
pScan->scanType = pCxt->pPlanCxt->streamQuery ? SCAN_TYPE_STREAM : SCAN_TYPE_TABLE;
|
||||
pScan->scanFlag = MAIN_SCAN;
|
||||
pScan->scanRange = TSWINDOW_INITIALIZER;
|
||||
pScan->tableName.type = TSDB_TABLE_NAME_T;
|
||||
|
|
|
@ -259,15 +259,21 @@ static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* p
|
|||
return (SPhysiNode*)pTableScan;
|
||||
}
|
||||
|
||||
static SPhysiNode* createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) {
|
||||
SStreamScanPhysiNode* pTableScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
|
||||
CHECK_ALLOC(pTableScan, NULL);
|
||||
CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan), (SPhysiNode*)pTableScan);
|
||||
return (SPhysiNode*)pTableScan;
|
||||
}
|
||||
|
||||
static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) {
|
||||
switch (pScanLogicNode->scanType) {
|
||||
case SCAN_TYPE_TAG:
|
||||
return createTagScanPhysiNode(pCxt, pScanLogicNode);
|
||||
case SCAN_TYPE_TABLE:
|
||||
return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
|
||||
case SCAN_TYPE_STABLE:
|
||||
case SCAN_TYPE_STREAM:
|
||||
break;
|
||||
return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -202,6 +202,13 @@ int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pV
|
|||
return (errno == ERANGE||errno == EINVAL) ? TSDB_CODE_FAILED:TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal) {
|
||||
uint64_t val = 0;
|
||||
int32_t code = tjsonGetUBigIntValue(pJson, pName, &val);
|
||||
*pVal = val;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal) {
|
||||
uint64_t val = 0;
|
||||
int32_t code = tjsonGetUBigIntValue(pJson, pName, &val);
|
||||
|
@ -239,6 +246,22 @@ int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, voi
|
|||
return func(pJsonObj, pObj);
|
||||
}
|
||||
|
||||
int32_t tjsonMakeObject(const SJson* pJson, const char* pName, FToObject func, void** pObj, int32_t objSize) {
|
||||
if (objSize <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SJson* pJsonObj = tjsonGetObjectItem(pJson, pName);
|
||||
if (NULL == pJsonObj) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
*pObj = calloc(1, objSize);
|
||||
if (NULL == *pObj) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
return func(pJsonObj, *pObj);
|
||||
}
|
||||
|
||||
int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize) {
|
||||
const cJSON* jArray = tjsonGetObjectItem(pJson, pName);
|
||||
int32_t size = (NULL == jArray ? 0 : tjsonGetArraySize(jArray));
|
||||
|
|
Loading…
Reference in New Issue