Merge remote-tracking branch 'origin/3.0_query_integrate' into feature/scheduler
This commit is contained in:
commit
8f1cc6ee91
|
@ -30,12 +30,13 @@ typedef int64_t tb_uid_t;
|
|||
#define IS_TSWINDOW_SPECIFIED(win) (((win).skey != INT64_MIN) || ((win).ekey != INT64_MAX))
|
||||
|
||||
typedef enum {
|
||||
TSDB_SUPER_TABLE = 1, // super table
|
||||
TSDB_CHILD_TABLE = 2, // table created from super table
|
||||
TSDB_NORMAL_TABLE = 3, // ordinary table
|
||||
TSDB_STREAM_TABLE = 4, // table created from stream computing
|
||||
TSDB_TEMP_TABLE = 5, // temp table created by nest query
|
||||
TSDB_TABLE_MAX = 6
|
||||
TSDB_SUPER_TABLE = 1, // super table
|
||||
TSDB_CHILD_TABLE = 2, // table created from super table
|
||||
TSDB_NORMAL_TABLE = 3, // ordinary table
|
||||
TSDB_STREAM_TABLE = 4, // table created from stream computing
|
||||
TSDB_TEMP_TABLE = 5, // temp table created by nest query
|
||||
TSDB_SYSTEM_TABLE = 6,
|
||||
TSDB_TABLE_MAX = 7
|
||||
} ETableType;
|
||||
|
||||
typedef enum {
|
||||
|
|
|
@ -879,6 +879,17 @@ typedef struct {
|
|||
char data[];
|
||||
} SRetrieveTableRsp;
|
||||
|
||||
typedef struct {
|
||||
int64_t handle;
|
||||
int64_t useconds;
|
||||
int8_t completed; // all results are returned to client
|
||||
int8_t precision;
|
||||
int8_t compressed;
|
||||
int32_t compLen;
|
||||
int32_t numOfRows;
|
||||
char data[];
|
||||
} SRetrieveMetaTableRsp;
|
||||
|
||||
typedef struct {
|
||||
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
|
||||
int32_t port;
|
||||
|
|
|
@ -163,7 +163,7 @@ typedef struct SInputColumnInfoData {
|
|||
typedef struct SqlFunctionCtx {
|
||||
SInputColumnInfoData input;
|
||||
SResultDataInfo resDataInfo;
|
||||
uint32_t order; // asc|desc
|
||||
uint32_t order; // data block scanner order: asc|desc
|
||||
////////////////////////////////////////////////////////////////
|
||||
int32_t startRow; // start row index
|
||||
int32_t size; // handled processed row number
|
||||
|
|
|
@ -36,7 +36,7 @@ typedef struct SLogicNode {
|
|||
typedef enum EScanType {
|
||||
SCAN_TYPE_TAG,
|
||||
SCAN_TYPE_TABLE,
|
||||
SCAN_TYPE_STABLE,
|
||||
SCAN_TYPE_SYSTEM_TABLE,
|
||||
SCAN_TYPE_STREAM
|
||||
} EScanType;
|
||||
|
||||
|
@ -147,9 +147,13 @@ typedef struct SScanPhysiNode {
|
|||
SName tableName;
|
||||
} SScanPhysiNode;
|
||||
|
||||
typedef SScanPhysiNode SSystemTableScanPhysiNode;
|
||||
typedef SScanPhysiNode STagScanPhysiNode;
|
||||
|
||||
typedef struct SSystemTableScanPhysiNode {
|
||||
SScanPhysiNode scan;
|
||||
SEpSet mgmtEpSet;
|
||||
} SSystemTableScanPhysiNode;
|
||||
|
||||
typedef struct STableScanPhysiNode {
|
||||
SScanPhysiNode scan;
|
||||
uint8_t scanFlag; // denotes reversed scan of data or not
|
||||
|
|
|
@ -25,6 +25,7 @@ extern "C" {
|
|||
typedef struct SPlanContext {
|
||||
uint64_t queryId;
|
||||
int32_t acctId;
|
||||
SEpSet mgmtEpSet;
|
||||
SNode* pAstRoot;
|
||||
} SPlanContext;
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ extern const int32_t TYPE_BYTES[15];
|
|||
#define TSDB_INS_TABLE_MNODES "mnodes"
|
||||
#define TSDB_INS_TABLE_MODULES "modules"
|
||||
#define TSDB_INS_TABLE_QNODES "qnodes"
|
||||
#define TSDB_INS_TABLE_USER_DATABASE "user_database"
|
||||
#define TSDB_INS_TABLE_USER_DATABASES "user_databases"
|
||||
#define TSDB_INS_TABLE_USER_FUNCTIONS "user_functions"
|
||||
#define TSDB_INS_TABLE_USER_INDEXES "user_indexes"
|
||||
#define TSDB_INS_TABLE_USER_STABLES "user_stables"
|
||||
|
|
|
@ -194,7 +194,12 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
|||
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
|
||||
pRequest->type = pQuery->msgType;
|
||||
SPlanContext cxt = { .queryId = pRequest->requestId, .pAstRoot = pQuery->pRoot, .acctId = pRequest->pTscObj->acctId };
|
||||
SPlanContext cxt = {
|
||||
.queryId = pRequest->requestId,
|
||||
.acctId = pRequest->pTscObj->acctId,
|
||||
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
||||
.pAstRoot = pQuery->pRoot
|
||||
};
|
||||
int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
|
|
|
@ -1732,6 +1732,7 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq
|
|||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->showId) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->type) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->free) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -1746,6 +1747,7 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR
|
|||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->type) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->free) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
|
|
@ -102,6 +102,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
|||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYSTABLE_RETRIEVE)] = dndProcessMnodeReadMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = dndProcessMnodeWriteMsg;
|
||||
|
|
|
@ -1342,123 +1342,149 @@ char *mnGetDbStr(char *src) {
|
|||
return pos;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||
static char* getDataPosition(char* pData, SShowObj* pShow, int32_t cols, int32_t rows, int32_t capacityOfRow) {
|
||||
return pData + pShow->offset[cols] * capacityOfRow + pShow->bytes[cols] * rows;
|
||||
}
|
||||
|
||||
static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_t rows, int32_t rowCapacity, int64_t numOfTables) {
|
||||
int32_t cols = 0;
|
||||
|
||||
char* pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
char *name = mnGetDbStr(pDb->name);
|
||||
if (name != NULL) {
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
|
||||
} else {
|
||||
STR_TO_VARSTR(pWrite, "NULL");
|
||||
}
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int64_t *)pWrite = pDb->createdTime;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int16_t *)pWrite = pDb->cfg.numOfVgroups;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int64_t *)pWrite = numOfTables;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int16_t *)pWrite = pDb->cfg.replications;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int16_t *)pWrite = pDb->cfg.quorum;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int16_t *)pWrite = pDb->cfg.daysPerFile;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
char tmp[128] = {0};
|
||||
if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) {
|
||||
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0);
|
||||
} else {
|
||||
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2);
|
||||
}
|
||||
STR_WITH_SIZE_TO_VARSTR(pWrite, tmp, strlen(tmp));
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int32_t *)pWrite = pDb->cfg.cacheBlockSize;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int32_t *)pWrite = pDb->cfg.totalBlocks;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int32_t *)pWrite = pDb->cfg.minRows;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int32_t *)pWrite = pDb->cfg.maxRows;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int8_t *)pWrite = pDb->cfg.walLevel;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int32_t *)pWrite = pDb->cfg.fsyncPeriod;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int8_t *)pWrite = pDb->cfg.compression;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int8_t *)pWrite = pDb->cfg.cacheLastRow;
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
char *prec = NULL;
|
||||
switch (pDb->cfg.precision) {
|
||||
case TSDB_TIME_PRECISION_MILLI:
|
||||
prec = TSDB_TIME_PRECISION_MILLI_STR;
|
||||
break;
|
||||
case TSDB_TIME_PRECISION_MICRO:
|
||||
prec = TSDB_TIME_PRECISION_MICRO_STR;
|
||||
break;
|
||||
case TSDB_TIME_PRECISION_NANO:
|
||||
prec = TSDB_TIME_PRECISION_NANO_STR;
|
||||
break;
|
||||
default:
|
||||
prec = "none";
|
||||
break;
|
||||
}
|
||||
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
|
||||
cols++;
|
||||
|
||||
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||
*(int8_t *)pWrite = pDb->cfg.update;
|
||||
}
|
||||
|
||||
static void setInformationSchemaDbCfg(SDbObj* pDbObj) {
|
||||
ASSERT(pDbObj != NULL);
|
||||
strncpy(pDbObj->name, TSDB_INFORMATION_SCHEMA_DB, tListLen(pDbObj->name));
|
||||
|
||||
pDbObj->createdTime = 0;
|
||||
pDbObj->cfg.numOfVgroups = 0;
|
||||
pDbObj->cfg.quorum = 1;
|
||||
pDbObj->cfg.replications = 1;
|
||||
pDbObj->cfg.update = 1;
|
||||
pDbObj->cfg.precision = TSDB_TIME_PRECISION_MILLI;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rowsCapacity) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
SDbObj *pDb = NULL;
|
||||
char *pWrite;
|
||||
int32_t cols = 0;
|
||||
|
||||
while (numOfRows < rows) {
|
||||
while (numOfRows < rowsCapacity) {
|
||||
pShow->pIter = sdbFetch(pSdb, SDB_DB, pShow->pIter, (void **)&pDb);
|
||||
if (pShow->pIter == NULL) break;
|
||||
|
||||
cols = 0;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
char *name = mnGetDbStr(pDb->name);
|
||||
if (name != NULL) {
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
|
||||
} else {
|
||||
STR_TO_VARSTR(pWrite, "NULL");
|
||||
if (pShow->pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int64_t *)pWrite = pDb->createdTime;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int16_t *)pWrite = pDb->cfg.numOfVgroups;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int16_t *)pWrite = 0; // todo
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int16_t *)pWrite = pDb->cfg.replications;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int16_t *)pWrite = pDb->cfg.quorum;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int16_t *)pWrite = pDb->cfg.daysPerFile;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
char tmp[128] = {0};
|
||||
if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) {
|
||||
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0);
|
||||
} else {
|
||||
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2);
|
||||
}
|
||||
STR_WITH_SIZE_TO_VARSTR(pWrite, tmp, strlen(tmp));
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int32_t *)pWrite = pDb->cfg.cacheBlockSize;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int32_t *)pWrite = pDb->cfg.totalBlocks;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int32_t *)pWrite = pDb->cfg.minRows;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int32_t *)pWrite = pDb->cfg.maxRows;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int8_t *)pWrite = pDb->cfg.walLevel;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int32_t *)pWrite = pDb->cfg.fsyncPeriod;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int8_t *)pWrite = pDb->cfg.compression;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int8_t *)pWrite = pDb->cfg.cacheLastRow;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
char *prec = NULL;
|
||||
switch (pDb->cfg.precision) {
|
||||
case TSDB_TIME_PRECISION_MILLI:
|
||||
prec = TSDB_TIME_PRECISION_MILLI_STR;
|
||||
break;
|
||||
case TSDB_TIME_PRECISION_MICRO:
|
||||
prec = TSDB_TIME_PRECISION_MICRO_STR;
|
||||
break;
|
||||
case TSDB_TIME_PRECISION_NANO:
|
||||
prec = TSDB_TIME_PRECISION_NANO_STR;
|
||||
break;
|
||||
default:
|
||||
prec = "none";
|
||||
break;
|
||||
}
|
||||
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int8_t *)pWrite = pDb->cfg.update;
|
||||
cols++;
|
||||
|
||||
dumpDbInfoToPayload(data, pDb, pShow, numOfRows, rowsCapacity, 0);
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pDb);
|
||||
}
|
||||
|
||||
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
|
||||
// Append the information_schema database into the result.
|
||||
if (numOfRows < rowsCapacity) {
|
||||
SDbObj dummyISDb = {0};
|
||||
setInformationSchemaDbCfg(&dummyISDb);
|
||||
dumpDbInfoToPayload(data, &dummyISDb, pShow, numOfRows, rowsCapacity, 14);
|
||||
numOfRows += 1;
|
||||
}
|
||||
|
||||
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rowsCapacity, pShow);
|
||||
pShow->numOfReads += numOfRows;
|
||||
|
||||
return numOfRows;
|
||||
|
|
|
@ -39,24 +39,24 @@ static const SInfosTableSchema qnodesSchema[] = {{.name = "id", .byt
|
|||
{.name = "end_point", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
};
|
||||
static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
||||
static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "created_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
{.name = "ntables", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "vgroups", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "replica", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "quorum", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "days", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "keep", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "vgroups", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||
{.name = "ntables", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||
{.name = "replica", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||
{.name = "quorum", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||
{.name = "days", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||
{.name = "keep", .bytes = 24 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "cache", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "blocks", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "minrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "maxrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "wallevel", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "wallevel", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||
{.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "comp", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "cachelast", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "precision", .bytes = 2, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||
{.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||
{.name = "precision", .bytes = 3 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||
};
|
||||
static const SInfosTableSchema userFuncSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
||||
{.name = "created_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
|
@ -128,7 +128,7 @@ static const SInfosTableMeta infosMeta[] = {{TSDB_INS_TABLE_DNODES, dnodesSchema
|
|||
{TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)},
|
||||
{TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)},
|
||||
{TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)},
|
||||
{TSDB_INS_TABLE_USER_DATABASE, userDBSchema, tListLen(userDBSchema)},
|
||||
{TSDB_INS_TABLE_USER_DATABASES, userDBSchema, tListLen(userDBSchema)},
|
||||
{TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
|
||||
{TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)},
|
||||
{TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)},
|
||||
|
@ -165,7 +165,7 @@ int32_t mndInsInitMeta(SHashObj *hash) {
|
|||
STableMetaRsp meta = {0};
|
||||
|
||||
strcpy(meta.dbFName, TSDB_INFORMATION_SCHEMA_DB);
|
||||
meta.tableType = TSDB_NORMAL_TABLE;
|
||||
meta.tableType = TSDB_SYSTEM_TABLE;
|
||||
meta.sversion = 1;
|
||||
meta.tversion = 1;
|
||||
|
||||
|
|
|
@ -284,6 +284,20 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) {
|
|||
strncpy(req.db, retrieveReq.db, tListLen(req.db));
|
||||
|
||||
pShow = mndCreateShowObj(pMnode, &req);
|
||||
STableMetaRsp *meta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, TSDB_INS_TABLE_USER_DATABASES, strlen(TSDB_INS_TABLE_USER_DATABASES));
|
||||
pShow->numOfRows = 100;
|
||||
|
||||
int32_t offset = 0;
|
||||
for(int32_t i = 0; i < meta->numOfColumns; ++i) {
|
||||
pShow->numOfColumns = meta->numOfColumns;
|
||||
pShow->offset[i] = offset;
|
||||
|
||||
int32_t bytes = meta->pSchemas[i].bytes;
|
||||
pShow->rowSize += bytes;
|
||||
pShow->bytes[i] = bytes;
|
||||
offset += bytes;
|
||||
}
|
||||
|
||||
if (pShow == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("failed to process show-meta req since %s", terrstr());
|
||||
|
@ -330,7 +344,7 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) {
|
|||
size = pShow->rowSize * rowsToRead;
|
||||
|
||||
size += SHOW_STEP_SIZE;
|
||||
SRetrieveTableRsp *pRsp = rpcMallocCont(size);
|
||||
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
|
||||
if (pRsp == NULL) {
|
||||
mndReleaseShowObj((SShowObj*) pShow, false);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -338,6 +352,8 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
pRsp->handle = htobe64(pShow->id);
|
||||
|
||||
// if free flag is set, client wants to clean the resources
|
||||
if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||
rowsRead = (*retrieveFp)(pReq, (SShowObj*) pShow, pRsp->data, rowsToRead);
|
||||
|
|
|
@ -365,28 +365,6 @@ typedef struct SQInfo {
|
|||
STaskCostInfo summary;
|
||||
} SQInfo;
|
||||
|
||||
typedef struct STaskParam {
|
||||
char* sql;
|
||||
char* tagCond;
|
||||
char* colCond;
|
||||
char* tbnameCond;
|
||||
char* prevResult;
|
||||
SArray* pTableIdList;
|
||||
SExprBasicInfo** pExpr;
|
||||
SExprBasicInfo** pSecExpr;
|
||||
SExprInfo* pExprs;
|
||||
SExprInfo* pSecExprs;
|
||||
|
||||
SFilterInfo* pFilters;
|
||||
|
||||
SColIndex* pGroupColIndex;
|
||||
SColumnInfo* pTagColumnInfo;
|
||||
SGroupbyExpr* pGroupbyExpr;
|
||||
int32_t tableScanOperator;
|
||||
SArray* pOperator;
|
||||
struct SUdfInfo* pUdfInfo;
|
||||
} STaskParam;
|
||||
|
||||
enum {
|
||||
EX_SOURCE_DATA_NOT_READY = 0x1,
|
||||
EX_SOURCE_DATA_READY = 0x2,
|
||||
|
@ -401,6 +379,12 @@ typedef struct SSourceDataInfo {
|
|||
int32_t status;
|
||||
} SSourceDataInfo;
|
||||
|
||||
typedef struct SLoadRemoteDataInfo {
|
||||
uint64_t totalSize; // total load bytes from remote
|
||||
uint64_t totalRows; // total number of rows
|
||||
uint64_t totalElapsed; // total elapsed time
|
||||
} SLoadRemoteDataInfo;
|
||||
|
||||
typedef struct SExchangeInfo {
|
||||
SArray* pSources;
|
||||
SArray* pSourceDataInfo;
|
||||
|
@ -409,9 +393,7 @@ typedef struct SExchangeInfo {
|
|||
SSDataBlock* pResult;
|
||||
bool seqLoadData; // sequential load data or not, false by default
|
||||
int32_t current;
|
||||
uint64_t totalSize; // total load bytes from remote
|
||||
uint64_t totalRows; // total number of rows
|
||||
uint64_t totalElapsed; // total elapsed time
|
||||
SLoadRemoteDataInfo loadInfo;
|
||||
} SExchangeInfo;
|
||||
|
||||
typedef struct STableScanInfo {
|
||||
|
@ -456,19 +438,17 @@ typedef struct SSysTableScanInfo {
|
|||
void* readHandle;
|
||||
};
|
||||
|
||||
void *pCur; // cursor
|
||||
SRetrieveTableReq* pReq;
|
||||
SEpSet epSet;
|
||||
int32_t type; // show type
|
||||
tsem_t ready;
|
||||
SSchema* pSchema;
|
||||
SSDataBlock* pRes;
|
||||
|
||||
int32_t capacity;
|
||||
int64_t numOfBlocks; // extract basic running information.
|
||||
int64_t totalRows;
|
||||
int64_t elapsedTime;
|
||||
int64_t totalBytes;
|
||||
SRetrieveMetaTableRsp *pRsp;
|
||||
void *pCur; // cursor
|
||||
SRetrieveTableReq req;
|
||||
SEpSet epSet;
|
||||
int32_t type; // show type
|
||||
tsem_t ready;
|
||||
SSchema* pSchema;
|
||||
SSDataBlock* pRes;
|
||||
int32_t capacity;
|
||||
int64_t numOfBlocks; // extract basic running information.
|
||||
SLoadRemoteDataInfo loadInfo;
|
||||
} SSysTableScanInfo;
|
||||
|
||||
typedef struct SOptrBasicInfo {
|
||||
|
@ -649,10 +629,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
|
|||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
|
||||
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, int32_t tableType, SEpSet epset,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
|
||||
|
@ -688,8 +666,10 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
|
|||
|
||||
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
|
||||
int32_t numOfOutput);
|
||||
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
|
||||
// SSDataBlock* doSLimit(void* param, bool* newgroup);
|
||||
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
|
||||
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p);
|
||||
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
|
||||
|
@ -706,9 +686,6 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
|||
|
||||
int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters);
|
||||
|
||||
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
|
||||
int32_t prevResultLen, void* merger);
|
||||
|
||||
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId);
|
||||
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
|
||||
|
||||
|
@ -720,11 +697,7 @@ int32_t buildArithmeticExprFromMsg(SExprInfo* pArithExprInfo, void* pQueryMsg);
|
|||
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
|
||||
int32_t checkForQueryBuf(size_t numOfTables);
|
||||
bool checkNeedToCompressQueryCol(SQInfo* pQInfo);
|
||||
void setQueryStatus(STaskRuntimeEnv* pRuntimeEnv, int8_t status);
|
||||
|
||||
int32_t doDumpQueryResult(SQInfo* pQInfo, char* data, int8_t compressed, int32_t* compLen);
|
||||
|
||||
size_t getResultSize(SQInfo* pQInfo, int64_t* numOfRows);
|
||||
void setTaskKilled(SExecTaskInfo* pTaskInfo);
|
||||
|
||||
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
|
||||
|
@ -734,8 +707,6 @@ void calculateOperatorProfResults(SQInfo* pQInfo);
|
|||
void queryCostStatis(SExecTaskInfo* pTaskInfo);
|
||||
|
||||
void doDestroyTask(SExecTaskInfo* pTaskInfo);
|
||||
void freeQueryAttr(STaskAttr* pQuery);
|
||||
|
||||
int32_t getMaximumIdleDurationSec();
|
||||
|
||||
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type);
|
||||
|
|
|
@ -4975,35 +4975,36 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SExchangeInfo *pExchangeInfo, SSourceDataInfo* pDataInfo, int32_t numOfOutput, int64_t startTs) {
|
||||
char* pData = pDataInfo->pRsp->data;
|
||||
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
|
||||
static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen,
|
||||
int32_t numOfOutput, int64_t startTs, uint64_t* total) {
|
||||
// char* pData = pRsp->data;
|
||||
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
|
||||
|
||||
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pRsp->numOfRows);
|
||||
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * numOfRows);
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
size_t len = pRsp->numOfRows * pColInfoData->info.bytes;
|
||||
size_t len = numOfRows * pColInfoData->info.bytes;
|
||||
memcpy(tmp, pData, len);
|
||||
|
||||
pColInfoData->pData = tmp;
|
||||
pData += len;
|
||||
}
|
||||
|
||||
pRes->info.rows = pRsp->numOfRows;
|
||||
pRes->info.rows = numOfRows;
|
||||
|
||||
int64_t el = taosGetTimestampUs() - startTs;
|
||||
|
||||
pExchangeInfo->totalRows += pRsp->numOfRows;
|
||||
pExchangeInfo->totalSize += pRsp->compLen;
|
||||
pDataInfo->totalRows += pRsp->numOfRows;
|
||||
pLoadInfo->totalRows += numOfRows;
|
||||
pLoadInfo->totalSize += compLen;
|
||||
|
||||
pExchangeInfo->totalElapsed += el;
|
||||
if (total != NULL) {
|
||||
*total += numOfRows;
|
||||
}
|
||||
|
||||
pLoadInfo->totalElapsed += el;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -5012,11 +5013,12 @@ static void* setAllSourcesCompleted(SOperatorInfo *pOperator, int64_t startTs) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
int64_t el = taosGetTimestampUs() - startTs;
|
||||
pExchangeInfo->totalElapsed += el;
|
||||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||
pLoadInfo->totalElapsed += el;
|
||||
|
||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||
qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
|
||||
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
|
||||
pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0);
|
||||
|
||||
doSetOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
|
@ -5045,17 +5047,19 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
|
|||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
|
||||
|
||||
SSDataBlock* pRes = pExchangeInfo->pResult;
|
||||
|
||||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||
if (pRsp->numOfRows == 0) {
|
||||
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows,
|
||||
pExchangeInfo->totalRows);
|
||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||
pExchangeInfo->loadInfo.totalRows);
|
||||
pDataInfo->status = DATA_EXHAUSTED;
|
||||
completed += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs);
|
||||
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
|
||||
code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows,
|
||||
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows);
|
||||
if (code != 0) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -5064,13 +5068,13 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
|
|||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
|
||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows,
|
||||
pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, i + 1,
|
||||
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1,
|
||||
totalSources);
|
||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||
} else {
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows,
|
||||
pExchangeInfo->totalSize);
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows,
|
||||
pLoadInfo->totalSize);
|
||||
}
|
||||
|
||||
if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
|
||||
|
@ -5139,10 +5143,12 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
|
|||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
|
||||
|
||||
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
|
||||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||
|
||||
if (pRsp->numOfRows == 0) {
|
||||
qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next",
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
|
||||
pDataInfo->totalRows, pExchangeInfo->totalRows);
|
||||
pDataInfo->totalRows, pLoadInfo->totalRows);
|
||||
|
||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||
pExchangeInfo->current += 1;
|
||||
|
@ -5150,20 +5156,22 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
|
|||
}
|
||||
|
||||
SSDataBlock* pRes = pExchangeInfo->pResult;
|
||||
setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs);
|
||||
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
|
||||
int32_t code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows,
|
||||
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows);
|
||||
|
||||
if (pRsp->completed == 1) {
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
|
||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows,
|
||||
pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1,
|
||||
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
|
||||
totalSources);
|
||||
|
||||
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||
pExchangeInfo->current += 1;
|
||||
} else {
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->totalSize);
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows, pLoadInfo->totalSize);
|
||||
}
|
||||
|
||||
return pExchangeInfo->pResult;
|
||||
|
@ -5196,16 +5204,13 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
SExchangeInfo *pExchangeInfo = pOperator->info;
|
||||
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
int32_t code = pOperator->_openFn(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||
|
||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
|
||||
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
|
||||
pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -5483,18 +5488,16 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock*
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
|
||||
static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*) param;
|
||||
pSourceDataInfo->pRsp = pMsg->pData;
|
||||
SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*) param;
|
||||
pScanResInfo->pRsp = pMsg->pData;
|
||||
|
||||
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
|
||||
SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
|
||||
pRsp->numOfRows = htonl(pRsp->numOfRows);
|
||||
pRsp->useconds = htobe64(pRsp->useconds);
|
||||
pRsp->handle = htobe64(pRsp->handle);
|
||||
pRsp->compLen = htonl(pRsp->compLen);
|
||||
|
||||
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
|
||||
tsem_post(&pSourceDataInfo->pEx->ready);
|
||||
tsem_post(&pScanResInfo->ready);
|
||||
}
|
||||
|
||||
static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
|
||||
|
@ -5521,22 +5524,19 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
|
|||
}
|
||||
}
|
||||
|
||||
pInfo->totalRows += numOfRows;
|
||||
pInfo->loadInfo.totalRows += numOfRows;
|
||||
pInfo->pRes->info.rows = numOfRows;
|
||||
|
||||
// pInfo->elapsedTime;
|
||||
// pInfo->totalBytes;
|
||||
return (pInfo->pRes->info.rows == 0)? NULL:pInfo->pRes;
|
||||
} else { // load the meta from mnode of the given epset
|
||||
if (pInfo->pReq == NULL) {
|
||||
pInfo->pReq = calloc(1, sizeof(SRetrieveTableReq));
|
||||
if (pInfo->pReq == NULL) {
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
int64_t startTs = taosGetTimestampUs();
|
||||
|
||||
pInfo->pReq->type = pInfo->type;
|
||||
}
|
||||
pInfo->req.type = pInfo->type;
|
||||
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
|
||||
char* buf1 = calloc(1, contLen);
|
||||
tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
|
||||
|
||||
// send the fetch remote task result reques
|
||||
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
|
||||
|
@ -5546,24 +5546,38 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pMsgSendInfo->param = NULL;
|
||||
pMsgSendInfo->msgInfo.pData = pInfo->pReq;
|
||||
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq);
|
||||
pMsgSendInfo->param = pInfo;
|
||||
pMsgSendInfo->msgInfo.pData = buf1;
|
||||
pMsgSendInfo->msgInfo.len = contLen;
|
||||
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
|
||||
pMsgSendInfo->fp = loadRemoteDataCallback;
|
||||
pMsgSendInfo->fp = loadSysTableContentCb;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
|
||||
|
||||
tsem_wait(&pInfo->ready);
|
||||
// handle the response and return to the caller
|
||||
|
||||
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
|
||||
pInfo->req.showId = pRsp->handle;
|
||||
|
||||
if (pRsp->numOfRows == 0) {
|
||||
// qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next",
|
||||
// GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
|
||||
// pDataInfo->totalRows, pExchangeInfo->totalRows);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
|
||||
setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows,
|
||||
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL);
|
||||
|
||||
return pInfo->pRes;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
|
||||
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, int32_t tableType,
|
||||
SEpSet epset, SExecTaskInfo* pTaskInfo) {
|
||||
SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo));
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -5573,7 +5587,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// todo: create the schema of result data block
|
||||
pInfo->pRes = pResBlock;
|
||||
pInfo->capacity = 4096;
|
||||
pInfo->type = tableType;
|
||||
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) {
|
||||
|
@ -5590,70 +5604,35 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S
|
|||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
|
||||
pOperator->getNextFn = doSysTableScan;
|
||||
pOperator->numOfOutput = pResBlock->info.numOfCols;
|
||||
pOperator->nextDataFn = doSysTableScan;
|
||||
pOperator->closeFn = destroySysTableScannerOperatorInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
return pOperator;
|
||||
}
|
||||
#if 1
|
||||
{ // todo refactor
|
||||
SRpcInit rpcInit;
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = 0;
|
||||
rpcInit.label = "DB-META";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = qProcessFetchRsp;
|
||||
rpcInit.sessions = tsMaxConnections;
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
rpcInit.user = (char *)"root";
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.ckey = "key";
|
||||
rpcInit.spi = 1;
|
||||
rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6";
|
||||
|
||||
void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) {
|
||||
assert(pTableScanInfo != NULL && pDownstream != NULL);
|
||||
|
||||
pTableScanInfo->pExpr = pDownstream->pExpr; // TODO refactor to use colId instead of pExpr
|
||||
pTableScanInfo->numOfOutput = pDownstream->numOfOutput;
|
||||
#if 0
|
||||
if (pDownstream->operatorType == OP_Aggregate || pDownstream->operatorType == OP_MultiTableAggregate) {
|
||||
SAggOperatorInfo* pAggInfo = pDownstream->info;
|
||||
|
||||
pTableScanInfo->pCtx = pAggInfo->binfo.pCtx;
|
||||
pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
|
||||
pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
|
||||
} else if (pDownstream->operatorType == OP_TimeWindow || pDownstream->operatorType == OP_AllTimeWindow) {
|
||||
STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info;
|
||||
|
||||
pTableScanInfo->pCtx = pIntervalInfo->pCtx;
|
||||
pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo;
|
||||
pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset;
|
||||
|
||||
} else if (pDownstream->operatorType == OP_Groupby) {
|
||||
SGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info;
|
||||
|
||||
pTableScanInfo->pCtx = pGroupbyInfo->binfo.pCtx;
|
||||
pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo;
|
||||
pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset;
|
||||
|
||||
} else if (pDownstream->operatorType == OP_MultiTableTimeInterval || pDownstream->operatorType == OP_AllMultiTableTimeInterval) {
|
||||
STableIntervalOperatorInfo *pInfo = pDownstream->info;
|
||||
|
||||
pTableScanInfo->pCtx = pInfo->pCtx;
|
||||
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
|
||||
pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset;
|
||||
|
||||
} else if (pDownstream->operatorType == OP_Project) {
|
||||
SProjectOperatorInfo *pInfo = pDownstream->info;
|
||||
|
||||
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
|
||||
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
|
||||
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
|
||||
} else if (pDownstream->operatorType == OP_SessionWindow) {
|
||||
SSWindowOperatorInfo* pInfo = pDownstream->info;
|
||||
|
||||
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
|
||||
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
|
||||
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
|
||||
} else if (pDownstream->operatorType == OP_StateWindow) {
|
||||
SStateWindowOperatorInfo* pInfo = pDownstream->info;
|
||||
|
||||
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
|
||||
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
|
||||
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
|
||||
} else {
|
||||
assert(0);
|
||||
pInfo->pTransporter = rpcOpen(&rpcInit);
|
||||
if (pInfo->pTransporter == NULL) {
|
||||
return NULL; // todo
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
SArray* getOrderCheckColumns(STaskAttr* pQuery) {
|
||||
|
@ -7355,23 +7334,23 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
|
|||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
|
||||
SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo));
|
||||
|
||||
int32_t numOfRows = 4096;
|
||||
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows);
|
||||
pInfo->binfo.capacity = 4096;
|
||||
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity);
|
||||
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
// initResultRowInfo(&pBInfo->resultRowInfo, 8);
|
||||
// setFunctionResultOutput(pBInfo, MAIN_SCAN);
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
pOperator->name = "ProjectOperator";
|
||||
// pOperator->operatorType = OP_Project;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
||||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pExpr = exprArrayDup(pExprInfo);
|
||||
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
|
||||
|
||||
pOperator->getNextFn = doProjectOperation;
|
||||
pOperator->nextDataFn = doProjectOperation;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->closeFn = destroyProjectOperatorInfo;
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
||||
|
@ -8039,37 +8018,6 @@ bool validateExprColumnInfo(SQueriedTableInfo *pTableInfo, SExprBasicInfo *pExpr
|
|||
return j != INT32_MIN;
|
||||
}
|
||||
|
||||
static bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SExprBasicInfo** pExpr, int32_t numOfOutput,
|
||||
SColumnInfo* pTagCols, void* pMsg) {
|
||||
int32_t numOfTotal = pTableInfo->numOfCols + pTableInfo->numOfTags;
|
||||
if (pTableInfo->numOfCols < 0 || pTableInfo->numOfTags < 0 || numOfTotal > TSDB_MAX_COLUMNS) {
|
||||
//qError("qmsg:%p illegal value of numOfCols %d numOfTags:%d", pMsg, pTableInfo->numOfCols, pTableInfo->numOfTags);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (numOfTotal == 0) { // table total columns are not required.
|
||||
// for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||
// SExprBasicInfo* p = pExpr[i];
|
||||
// if ((p->functionId == FUNCTION_TAGPRJ) ||
|
||||
// (p->functionId == FUNCTION_TID_TAG && p->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) ||
|
||||
// (p->functionId == FUNCTION_COUNT && p->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) ||
|
||||
// (p->functionId == FUNCTION_BLKINFO)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// return false;
|
||||
// }
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||
if (!validateExprColumnInfo(pTableInfo, pExpr[i], pTagCols)) {
|
||||
return TSDB_CODE_QRY_INVALID_MSG;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t numOfFilters, char** pMsg) {
|
||||
for (int32_t f = 0; f < numOfFilters; ++f) {
|
||||
SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)(*pMsg);
|
||||
|
@ -8104,10 +8052,10 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t
|
|||
static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, const char* name) {
|
||||
SResSchema s = {0};
|
||||
s.scale = scale;
|
||||
s.precision = precision;
|
||||
s.type = type;
|
||||
s.bytes = bytes;
|
||||
s.colId = slotId;
|
||||
s.precision = precision;
|
||||
strncpy(s.name, name, tListLen(s.name));
|
||||
|
||||
return s;
|
||||
|
@ -8243,9 +8191,16 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
|
|||
SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols);
|
||||
|
||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, colList, tableIdList, pTaskInfo);
|
||||
|
||||
taosArrayDestroy(tableIdList);
|
||||
return pOperator;
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) {
|
||||
SSystemTableScanPhysiNode * pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
|
||||
|
||||
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(NULL, pResBlock, TSDB_MGMT_TABLE_DB, pSysScanPhyNode->mgmtEpSet, pTaskInfo);
|
||||
return pOperator;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,10 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
|||
void minFunction(SqlFunctionCtx* pCtx);
|
||||
void maxFunction(SqlFunctionCtx *pCtx);
|
||||
|
||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
void firstFunction(SqlFunctionCtx *pCtx);
|
||||
void lastFunction(SqlFunctionCtx *pCtx);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -61,6 +61,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = maxFunction,
|
||||
.finalizeFunc = functionFinalizer
|
||||
},
|
||||
{
|
||||
.name = "first",
|
||||
.type = FUNCTION_TYPE_FIRST,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.checkFunc = stubCheckAndGetResultType,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = firstFunction,
|
||||
.finalizeFunc = functionFinalizer
|
||||
},
|
||||
{
|
||||
.name = "last",
|
||||
.type = FUNCTION_TYPE_LAST,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.checkFunc = stubCheckAndGetResultType,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = lastFunction,
|
||||
.finalizeFunc = functionFinalizer
|
||||
},
|
||||
{
|
||||
.name = "concat",
|
||||
.type = FUNCTION_TYPE_CONCAT,
|
||||
|
@ -98,6 +118,8 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
|
|||
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType };
|
||||
break;
|
||||
}
|
||||
case FUNCTION_TYPE_FIRST:
|
||||
case FUNCTION_TYPE_LAST:
|
||||
case FUNCTION_TYPE_MIN:
|
||||
case FUNCTION_TYPE_MAX: {
|
||||
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
|
|
|
@ -72,13 +72,12 @@ void countFunction(SqlFunctionCtx *pCtx) {
|
|||
int32_t numOfElem = 0;
|
||||
|
||||
/*
|
||||
* 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->isAggSet == true;
|
||||
* 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->isAggSet == true;
|
||||
* 3. for primary key column, pCtx->hasNull always be false, pCtx->isAggSet == false;
|
||||
* 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataAggIsSet == true;
|
||||
* 2. for general non-primary key columns, pInputCol->hasNull may be true or false, pInput->colDataAggIsSet == true;
|
||||
* 3. for primary key column, pInputCol->hasNull always be false, pInput->colDataAggIsSet == false;
|
||||
*/
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
|
||||
if (pInput->colDataAggIsSet && pInput->totalRows == pInput->numOfRows) {
|
||||
numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull;
|
||||
ASSERT(numOfElem >= 0);
|
||||
|
@ -173,7 +172,7 @@ void sumFunction(SqlFunctionCtx *pCtx) {
|
|||
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||
}
|
||||
|
||||
bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SSumRes);
|
||||
return true;
|
||||
}
|
||||
|
@ -265,8 +264,7 @@ bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(int64_t);
|
||||
return true;
|
||||
}
|
||||
|
@ -278,34 +276,34 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
|||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
|
||||
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0);
|
||||
|
||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
||||
do { \
|
||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
|
||||
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
||||
__ctx->tag.i = (ts); \
|
||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
||||
} \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
||||
__ctx->tag.i = (ts); \
|
||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
||||
} \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
|
||||
do { \
|
||||
if (((left) < (right)) ^ (sign)) { \
|
||||
(left) = (right); \
|
||||
DO_UPDATE_SUBSID_RES(ctx, _ts); \
|
||||
(num) += 1; \
|
||||
} \
|
||||
do { \
|
||||
if (((left) < (right)) ^ (sign)) { \
|
||||
(left) = (right); \
|
||||
DO_UPDATE_SUBSID_RES(ctx, _ts); \
|
||||
(num) += 1; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
|
||||
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
|
||||
do { \
|
||||
_t* d = (_t*)((_col)->pData); \
|
||||
_t *d = (_t *)((_col)->pData); \
|
||||
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
|
||||
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
|
||||
continue; \
|
||||
|
@ -445,4 +443,118 @@ void minFunction(SqlFunctionCtx *pCtx) {
|
|||
void maxFunction(SqlFunctionCtx *pCtx) {
|
||||
int32_t numOfElems = doMinMaxHelper(pCtx, 0);
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
||||
}
|
||||
}
|
||||
|
||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
pEnv->calcMemSize = pNode->node.resType.bytes;
|
||||
return true;
|
||||
}
|
||||
|
||||
// TODO fix this
|
||||
// This ordinary first function only handle the data block in ascending order
|
||||
void firstFunction(SqlFunctionCtx *pCtx) {
|
||||
if (pCtx->order == TSDB_ORDER_DESC) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
|
||||
struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
|
||||
// All null data column, return directly.
|
||||
if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) {
|
||||
ASSERT(pInputCol->hasNull == true);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check for the first not null data
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
memcpy(buf, data, pInputCol->info.bytes);
|
||||
// TODO handle the subsidary value
|
||||
// if (pCtx->ptsList != NULL) {
|
||||
// TSKEY k = GET_TS_DATA(pCtx, i);
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, k);
|
||||
// }
|
||||
|
||||
pResInfo->hasResult = DATA_SET_FLAG;
|
||||
pResInfo->complete = true;
|
||||
|
||||
numOfElems++;
|
||||
break;
|
||||
}
|
||||
|
||||
SET_VAL(pResInfo, numOfElems, 1);
|
||||
}
|
||||
|
||||
void lastFunction(SqlFunctionCtx *pCtx) {
|
||||
if (pCtx->order != TSDB_ORDER_DESC) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
|
||||
struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
|
||||
// All null data column, return directly.
|
||||
if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) {
|
||||
ASSERT(pInputCol->hasNull == true);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pCtx->order == TSDB_ORDER_DESC) {
|
||||
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
|
||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
memcpy(buf, data, pInputCol->info.bytes);
|
||||
|
||||
// TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
|
||||
pResInfo->hasResult = DATA_SET_FLAG;
|
||||
pResInfo->complete = true; // set query completed on this column
|
||||
numOfElems++;
|
||||
break;
|
||||
}
|
||||
} else { // ascending order
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
|
||||
|
||||
if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) {
|
||||
pResInfo->hasResult = DATA_SET_FLAG;
|
||||
memcpy(buf, data, pCtx->inputBytes);
|
||||
|
||||
*(TSKEY*)buf = ts;
|
||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SET_VAL(pResInfo, numOfElems, 1);
|
||||
}
|
||||
|
|
|
@ -107,6 +107,8 @@ const char* nodesNodeName(ENodeType type) {
|
|||
return "PhysiTableSeqScan";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
|
||||
return "PhysiSreamScan";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
|
||||
return "PhysiSystemTableScan";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return "PhysiProject";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
|
||||
|
@ -440,6 +442,87 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkEndPointFqdn = "Fqdn";
|
||||
static const char* jkEndPointPort = "Port";
|
||||
|
||||
static int32_t epToJson(const void* pObj, SJson* pJson) {
|
||||
const SEp* pNode = (const SEp*)pObj;
|
||||
|
||||
int32_t code = tjsonAddStringToObject(pJson, jkEndPointFqdn, pNode->fqdn);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkEndPointPort, pNode->port);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToEp(const SJson* pJson, void* pObj) {
|
||||
SEp* pNode = (SEp*)pObj;
|
||||
|
||||
int32_t code = tjsonGetStringValue(pJson, jkEndPointFqdn, pNode->fqdn);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetSmallIntValue(pJson, jkEndPointPort, &pNode->port);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkEpSetInUse = "InUse";
|
||||
static const char* jkEpSetNumOfEps = "NumOfEps";
|
||||
static const char* jkEpSetEps = "Eps";
|
||||
|
||||
static int32_t epSetToJson(const void* pObj, SJson* pJson) {
|
||||
const SEpSet* pNode = (const SEpSet*)pObj;
|
||||
|
||||
int32_t code = tjsonAddIntegerToObject(pJson, jkEpSetInUse, pNode->inUse);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkEpSetNumOfEps, pNode->numOfEps);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddArray(pJson, jkEpSetEps, epToJson, pNode->eps, sizeof(SEp), pNode->numOfEps);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToEpSet(const SJson* pJson, void* pObj) {
|
||||
SEpSet* pNode = (SEpSet*)pObj;
|
||||
|
||||
int32_t code = tjsonGetTinyIntValue(pJson, jkEpSetInUse, &pNode->inUse);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetTinyIntValue(pJson, jkEpSetNumOfEps, &pNode->numOfEps);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonToArray(pJson, jkEpSetEps, jsonToEp, pNode->eps, sizeof(SEp));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet";
|
||||
|
||||
static int32_t physiSysTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SSystemTableScanPhysiNode* pNode = (const SSystemTableScanPhysiNode*)pObj;
|
||||
|
||||
int32_t code = physiScanNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkSysTableScanPhysiPlanMnodeEpSet, epSetToJson, &pNode->mgmtEpSet);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) {
|
||||
SSystemTableScanPhysiNode* pNode = (SSystemTableScanPhysiNode*)pObj;
|
||||
|
||||
int32_t code = jsonToPhysiScanNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonToObject(pJson, jkSysTableScanPhysiPlanMnodeEpSet, jsonToEpSet, &pNode->mgmtEpSet);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkProjectPhysiPlanProjections = "Projections";
|
||||
|
||||
static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
|
||||
|
@ -625,31 +708,6 @@ static int32_t jsonToSubplanId(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkEndPointFqdn = "Fqdn";
|
||||
static const char* jkEndPointPort = "Port";
|
||||
|
||||
static int32_t epToJson(const void* pObj, SJson* pJson) {
|
||||
const SEp* pNode = (const SEp*)pObj;
|
||||
|
||||
int32_t code = tjsonAddStringToObject(pJson, jkEndPointFqdn, pNode->fqdn);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkEndPointPort, pNode->port);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToEp(const SJson* pJson, void* pObj) {
|
||||
SEp* pNode = (SEp*)pObj;
|
||||
|
||||
int32_t code = tjsonGetStringValue(pJson, jkEndPointFqdn, pNode->fqdn);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetSmallIntValue(pJson, jkEndPointPort, &pNode->port);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkQueryNodeAddrId = "Id";
|
||||
static const char* jkQueryNodeAddrInUse = "InUse";
|
||||
static const char* jkQueryNodeAddrNumOfEps = "NumOfEps";
|
||||
|
@ -1490,6 +1548,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
|
||||
return physiSysTableScanNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return physiProjectNodeToJson(pObj, pJson);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
|
||||
|
@ -1567,6 +1627,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToPhysiTagScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
return jsonToPhysiTableScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
|
||||
return jsonToPhysiSysTableScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return jsonToPhysiProjectNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
|
||||
|
|
|
@ -146,6 +146,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(STableSeqScanPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
|
||||
return makeNode(type, sizeof(SNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
|
||||
return makeNode(type, sizeof(SSystemTableScanPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
|
||||
return makeNode(type, sizeof(SProjectPhysiNode));
|
||||
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
|
||||
|
|
|
@ -23,6 +23,13 @@ extern "C" {
|
|||
#include "os.h"
|
||||
#include "query.h"
|
||||
|
||||
#define parserFatal(param, ...) qFatal("PARSER: " param, __VA_ARGS__)
|
||||
#define parserError(param, ...) qError("PARSER: " param, __VA_ARGS__)
|
||||
#define parserWarn(param, ...) qWarn("PARSER: " param, __VA_ARGS__)
|
||||
#define parserInfo(param, ...) qInfo("PARSER: " param, __VA_ARGS__)
|
||||
#define parserDebug(param, ...) qDebug("PARSER: " param, __VA_ARGS__)
|
||||
#define parserTrace(param, ...) qTrace("PARSER: " param, __VA_ARGS__)
|
||||
|
||||
typedef struct SMsgBuf {
|
||||
int32_t len;
|
||||
char *buf;
|
||||
|
|
|
@ -396,9 +396,9 @@ static bool checkPort(SAstCreateContext* pCxt, const SToken* pPortToken, int32_t
|
|||
return pCxt->valid;
|
||||
}
|
||||
|
||||
static bool checkDbName(SAstCreateContext* pCxt, const SToken* pDbName) {
|
||||
static bool checkDbName(SAstCreateContext* pCxt, const SToken* pDbName, bool query) {
|
||||
if (NULL == pDbName) {
|
||||
return true;
|
||||
return (query ? NULL != pCxt->pQueryCxt->db : true);
|
||||
}
|
||||
pCxt->valid = pDbName->n < TSDB_DB_NAME_LEN ? true : false;
|
||||
return pCxt->valid;
|
||||
|
@ -557,7 +557,7 @@ SNode* createNodeListNode(SAstCreateContext* pCxt, SNodeList* pList) {
|
|||
}
|
||||
|
||||
SNode* createRealTableNode(SAstCreateContext* pCxt, const SToken* pDbName, const SToken* pTableName, const SToken* pTableAlias) {
|
||||
if (!checkDbName(pCxt, pDbName) || !checkTableName(pCxt, pTableName)) {
|
||||
if (!checkDbName(pCxt, pDbName, true) || !checkTableName(pCxt, pTableName)) {
|
||||
return NULL;
|
||||
}
|
||||
SRealTableNode* realTable = (SRealTableNode*)nodesMakeNode(QUERY_NODE_REAL_TABLE);
|
||||
|
@ -769,7 +769,7 @@ SDatabaseOptions* setDatabaseOption(SAstCreateContext* pCxt, SDatabaseOptions* p
|
|||
}
|
||||
|
||||
SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pDbName, SDatabaseOptions* pOptions) {
|
||||
if (!checkDbName(pCxt, pDbName)) {
|
||||
if (!checkDbName(pCxt, pDbName, false)) {
|
||||
return NULL;
|
||||
}
|
||||
SCreateDatabaseStmt* pStmt = (SCreateDatabaseStmt*)nodesMakeNode(QUERY_NODE_CREATE_DATABASE_STMT);
|
||||
|
@ -782,7 +782,7 @@ SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, cons
|
|||
}
|
||||
|
||||
SNode* createDropDatabaseStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pDbName) {
|
||||
if (!checkDbName(pCxt, pDbName)) {
|
||||
if (!checkDbName(pCxt, pDbName, false)) {
|
||||
return NULL;
|
||||
}
|
||||
SDropDatabaseStmt* pStmt = (SDropDatabaseStmt*)nodesMakeNode(QUERY_NODE_DROP_DATABASE_STMT);
|
||||
|
@ -904,7 +904,7 @@ SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName) {
|
|||
}
|
||||
|
||||
SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDbName) {
|
||||
if (!checkDbName(pCxt, pDbName)) {
|
||||
if (!checkDbName(pCxt, pDbName, false)) {
|
||||
return NULL;
|
||||
}
|
||||
SShowStmt* pStmt = nodesMakeNode(type);;
|
||||
|
|
|
@ -1032,7 +1032,6 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
|
|||
};
|
||||
|
||||
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -1051,6 +1050,5 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
|
|||
code = parseInsertBody(&context);
|
||||
}
|
||||
destroyInsertParseContext(&context);
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -69,14 +69,78 @@ static int32_t addNamespace(STranslateContext* pCxt, void* pTable) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SName* toName(int32_t acctId, const SRealTableNode* pRealTable, SName* pName) {
|
||||
static SName* toName(int32_t acctId, const char* pDbName, const char* pTableName, SName* pName) {
|
||||
pName->type = TSDB_TABLE_NAME_T;
|
||||
pName->acctId = acctId;
|
||||
strcpy(pName->dbname, pRealTable->table.dbName);
|
||||
strcpy(pName->tname, pRealTable->table.tableName);
|
||||
strcpy(pName->dbname, pDbName);
|
||||
strcpy(pName->tname, pTableName);
|
||||
return pName;
|
||||
}
|
||||
|
||||
static int32_t getTableMetaImpl(SParseContext* pCxt, const SName* pName, STableMeta** pMeta) {
|
||||
int32_t code = catalogGetTableMeta(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pMeta);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
parserError("catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getTableMeta(SParseContext* pCxt, const char* pDbName, const char* pTableName, STableMeta** pMeta) {
|
||||
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId };
|
||||
strcpy(name.dbname, pDbName);
|
||||
strcpy(name.tname, pTableName);
|
||||
return getTableMetaImpl(pCxt, &name, pMeta);
|
||||
}
|
||||
|
||||
static int32_t getTableDistVgInfo(SParseContext* pCxt, const SName* pName, SArray** pVgInfo) {
|
||||
int32_t code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pVgInfo);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
parserError("catalogGetTableDistVgInfo error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getDBVgInfoImpl(SParseContext* pCxt, const SName* pName, SArray** pVgInfo) {
|
||||
char fullDbName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pName, fullDbName);
|
||||
int32_t code = catalogGetDBVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, fullDbName, pVgInfo);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
parserError("catalogGetDBVgInfo error, code:%s, dbFName:%s", tstrerror(code), fullDbName);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getDBVgInfo(SParseContext* pCxt, const char* pDbName, SArray** pVgInfo) {
|
||||
SName name;
|
||||
tNameSetDbName(&name, pCxt->acctId, pDbName, strlen(pDbName));
|
||||
char dbFname[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(&name, dbFname);
|
||||
return getDBVgInfoImpl(pCxt, &name, pVgInfo);
|
||||
}
|
||||
|
||||
static int32_t getTableHashVgroupImpl(SParseContext* pCxt, const SName* pName, SVgroupInfo* pInfo) {
|
||||
int32_t code = catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pInfo);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
parserError("catalogGetTableHashVgroup error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getTableHashVgroup(SParseContext* pCxt, const char* pDbName, const char* pTableName, SVgroupInfo* pInfo) {
|
||||
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId };
|
||||
strcpy(name.dbname, pDbName);
|
||||
strcpy(name.tname, pTableName);
|
||||
return getTableHashVgroupImpl(pCxt, &name, pInfo);
|
||||
}
|
||||
|
||||
static int32_t getDBVgVersion(SParseContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId, int32_t* pTableNum) {
|
||||
int32_t code = catalogGetDBVgVersion(pCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
parserError("catalogGetDBVgVersion error, code:%s, dbFName:%s", tstrerror(code), pDbFName);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static bool belongTable(const char* currentDb, const SColumnNode* pCol, const STableNode* pTable) {
|
||||
int cmp = 0;
|
||||
if ('\0' != pCol->dbName[0]) {
|
||||
|
@ -498,25 +562,35 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNode* pRealTable) {
|
||||
static int32_t toVgroupsInfo(SArray* pVgs, SVgroupsInfo** pVgsInfo) {
|
||||
size_t vgroupNum = taosArrayGetSize(pVgs);
|
||||
*pVgsInfo = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupNum);
|
||||
if (NULL == *pVgsInfo) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
(*pVgsInfo)->numOfVgroups = vgroupNum;
|
||||
for (int32_t i = 0; i < vgroupNum; ++i) {
|
||||
SVgroupInfo *vg = taosArrayGet(pVgs, i);
|
||||
(*pVgsInfo)->vgroups[i] = *vg;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t setTableVgroupList(SParseContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) {
|
||||
SArray* vgroupList = NULL;
|
||||
int32_t code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, name, &vgroupList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
code = getTableDistVgInfo(pCxt, pName, &vgroupList);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
|
||||
}
|
||||
|
||||
size_t vgroupNum = taosArrayGetSize(vgroupList);
|
||||
pRealTable->pVgroupList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupNum);
|
||||
if (NULL == pRealTable->pVgroupList) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosArrayDestroy(vgroupList);
|
||||
} else if (TSDB_SYSTEM_TABLE == pRealTable->pMeta->tableType) {
|
||||
SArray* vgroupList = NULL;
|
||||
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
|
||||
}
|
||||
pRealTable->pVgroupList->numOfVgroups = vgroupNum;
|
||||
for (int32_t i = 0; i < vgroupNum; ++i) {
|
||||
SVgroupInfo *vg = taosArrayGet(vgroupList, i);
|
||||
pRealTable->pVgroupList->vgroups[i] = *vg;
|
||||
}
|
||||
|
||||
taosArrayDestroy(vgroupList);
|
||||
} else {
|
||||
pRealTable->pVgroupList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
|
||||
|
@ -524,12 +598,9 @@ static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNo
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pRealTable->pVgroupList->numOfVgroups = 1;
|
||||
int32_t code = catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, name, pRealTable->pVgroupList->vgroups);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
code = getTableHashVgroupImpl(pCxt, pName, pRealTable->pVgroupList->vgroups);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
||||
|
@ -538,8 +609,8 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
|||
case QUERY_NODE_REAL_TABLE: {
|
||||
SRealTableNode* pRealTable = (SRealTableNode*)pTable;
|
||||
SName name;
|
||||
code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &(pCxt->pParseCxt->mgmtEpSet),
|
||||
toName(pCxt->pParseCxt->acctId, pRealTable, &name), &(pRealTable->pMeta));
|
||||
code = getTableMetaImpl(pCxt->pParseCxt,
|
||||
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name), &(pRealTable->pMeta));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName);
|
||||
}
|
||||
|
@ -894,11 +965,10 @@ static int32_t doTranslateDropSuperTable(STranslateContext* pCxt, const SName* p
|
|||
static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt) {
|
||||
SDropTableClause* pClause = nodesListGetNode(pStmt->pTables, 0);
|
||||
|
||||
SName tableName = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
|
||||
strcpy(tableName.dbname, pClause->dbName);
|
||||
strcpy(tableName.tname, pClause->tableName);
|
||||
STableMeta* pTableMeta = NULL;
|
||||
int32_t code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &(pCxt->pParseCxt->mgmtEpSet), &tableName, &pTableMeta);
|
||||
SName tableName;
|
||||
int32_t code = getTableMetaImpl(
|
||||
pCxt->pParseCxt, toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &tableName), &pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (TSDB_SUPER_TABLE == pTableMeta->tableType) {
|
||||
code = doTranslateDropSuperTable(pCxt, &tableName, pClause->ignoreNotExists);
|
||||
|
@ -906,8 +976,8 @@ static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt
|
|||
// todo : drop normal table or child table
|
||||
code = TSDB_CODE_FAILED;
|
||||
}
|
||||
tfree(pTableMeta);
|
||||
}
|
||||
tfree(pTableMeta);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -920,13 +990,14 @@ static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableS
|
|||
}
|
||||
|
||||
static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* pStmt) {
|
||||
SUseDbReq usedbReq = {0};
|
||||
SName name = {0};
|
||||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
|
||||
|
||||
SUseDbReq usedbReq = {0};
|
||||
tNameExtractFullName(&name, usedbReq.db);
|
||||
|
||||
catalogGetDBVgVersion(pCxt->pParseCxt->pCatalog, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable);
|
||||
int32_t code = getDBVgVersion(pCxt->pParseCxt, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
|
||||
if (NULL== pCxt->pCmdMsg) {
|
||||
|
@ -1102,19 +1173,14 @@ static int32_t translateShow(STranslateContext* pCxt, SShowStmt* pStmt) {
|
|||
}
|
||||
|
||||
static int32_t translateShowTables(STranslateContext* pCxt) {
|
||||
SName name = {0};
|
||||
SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
|
||||
if (pCxt->pParseCxt->db == NULL || strlen(pCxt->pParseCxt->db) == 0) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSC_INVALID_OPERATION, "db not specified");
|
||||
}
|
||||
|
||||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, strlen(pCxt->pParseCxt->db));
|
||||
char dbFname[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(&name, dbFname);
|
||||
|
||||
SArray* array = NULL;
|
||||
int32_t code = catalogGetDBVgInfo(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, dbFname, &array);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
int32_t code = getDBVgInfo(pCxt->pParseCxt, pCxt->pParseCxt->db, &array);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
SVgroupInfo* info = taosArrayGet(array, 0);
|
||||
|
@ -1236,6 +1302,25 @@ static void destroyTranslateContext(STranslateContext* pCxt) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t rewriteShowDatabase(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
SSelectStmt* pStmt = nodesMakeNode(QUERY_NODE_SELECT_STMT);
|
||||
if (NULL == pStmt) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
SRealTableNode* pTable = nodesMakeNode(QUERY_NODE_REAL_TABLE);
|
||||
if (NULL == pTable) {
|
||||
nodesDestroyNode(pStmt);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
strcpy(pTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB);
|
||||
strcpy(pTable->table.tableName, TSDB_INS_TABLE_USER_DATABASES);
|
||||
pStmt->pFromTable = (SNode*)pTable;
|
||||
|
||||
nodesDestroyNode(pQuery->pRoot);
|
||||
pQuery->pRoot = (SNode*)pStmt;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct SVgroupTablesBatch {
|
||||
SVCreateTbBatchReq req;
|
||||
SVgroupInfo info;
|
||||
|
@ -1325,13 +1410,6 @@ static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
|
|||
taosArrayDestroy(pTbBatch->req.pArray);
|
||||
}
|
||||
|
||||
static int32_t getTableHashVgroup(SParseContext* pCxt, const char* pDbName, const char* pTableName, SVgroupInfo* pInfo) {
|
||||
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId };
|
||||
strcpy(name.dbname, pDbName);
|
||||
strcpy(name.tname, pTableName);
|
||||
return catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, &name, pInfo);
|
||||
}
|
||||
|
||||
static int32_t rewriteToVnodeModifOpStmt(SQuery* pQuery, SArray* pBufArray) {
|
||||
SVnodeModifOpStmt* pNewStmt = nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||
if (pNewStmt == NULL) {
|
||||
|
@ -1516,12 +1594,9 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
|
|||
}
|
||||
|
||||
static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt, SHashObj* pVgroupHashmap) {
|
||||
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
|
||||
strcpy(name.dbname, pStmt->useDbName);
|
||||
strcpy(name.tname, pStmt->useTableName);
|
||||
STableMeta* pSuperTableMeta = NULL;
|
||||
int32_t code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, &name, &pSuperTableMeta);
|
||||
|
||||
int32_t code = getTableMeta(pCxt->pParseCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta);
|
||||
|
||||
SKVRowBuilder kvRowBuilder = {0};
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tdInitKVRowBuilder(&kvRowBuilder);
|
||||
|
@ -1609,6 +1684,9 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
|
|||
static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pQuery->pRoot)) {
|
||||
case QUERY_NODE_SHOW_DATABASES_STMT:
|
||||
code = rewriteShowDatabase(pCxt, pQuery);
|
||||
break;
|
||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||
if (NULL == ((SCreateTableStmt*)pQuery->pRoot)->pTags) {
|
||||
code = rewriteCreateTable(pCxt, pQuery);
|
||||
|
|
|
@ -38,11 +38,14 @@ static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) {
|
|||
}
|
||||
|
||||
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) {
|
||||
return parseInsertSql(pCxt, pQuery);
|
||||
code = parseInsertSql(pCxt, pQuery);
|
||||
} else {
|
||||
return parseSqlIntoAst(pCxt, pQuery);
|
||||
code = parseSqlIntoAst(pCxt, pQuery);
|
||||
}
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
|
||||
void qDestroyQuery(SQuery* pQueryNode) {
|
||||
|
|
|
@ -123,6 +123,26 @@ error:
|
|||
return pRoot;
|
||||
}
|
||||
|
||||
static EScanType getScanType(SNodeList* pScanCols, STableMeta* pMeta) {
|
||||
if (NULL == pScanCols) {
|
||||
// select count(*) from t
|
||||
return SCAN_TYPE_TABLE;
|
||||
}
|
||||
|
||||
if (TSDB_SYSTEM_TABLE == pMeta->tableType) {
|
||||
return SCAN_TYPE_SYSTEM_TABLE;
|
||||
}
|
||||
|
||||
SNode* pCol = NULL;
|
||||
FOREACH(pCol, pScanCols) {
|
||||
if (COLUMN_TYPE_COLUMN == ((SColumnNode*)pCol)->colType) {
|
||||
return SCAN_TYPE_TABLE;
|
||||
}
|
||||
}
|
||||
|
||||
return SCAN_TYPE_TAG;
|
||||
}
|
||||
|
||||
static SLogicNode* createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable) {
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN);
|
||||
CHECK_ALLOC(pScan, NULL);
|
||||
|
@ -145,7 +165,7 @@ static SLogicNode* createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
|
|||
CHECK_ALLOC(pScan->node.pTargets, (SLogicNode*)pScan);
|
||||
}
|
||||
|
||||
pScan->scanType = SCAN_TYPE_TABLE;
|
||||
pScan->scanType = getScanType(pCols, pScan->pMeta);
|
||||
pScan->scanFlag = MAIN_SCAN;
|
||||
pScan->scanRange = TSWINDOW_INITIALIZER;
|
||||
pScan->tableName.type = TSDB_TABLE_NAME_T;
|
||||
|
|
|
@ -199,20 +199,27 @@ static SNodeptr createPrimaryKeyCol(SPhysiPlanContext* pCxt, uint64_t tableId) {
|
|||
}
|
||||
|
||||
static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
|
||||
pScanPhysiNode->pScanCols = nodesMakeList();
|
||||
CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
|
||||
CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid)));
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanPhysiNode)
|
||||
|| QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN == nodeType(pScanPhysiNode)) {
|
||||
pScanPhysiNode->pScanCols = nodesMakeList();
|
||||
CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
|
||||
CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid)));
|
||||
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pScanCols) {
|
||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) {
|
||||
SColumnNode* pCol = nodesListGetNode(pScanPhysiNode->pScanCols, 0);
|
||||
strcpy(pCol->tableAlias, ((SColumnNode*)pNode)->tableAlias);
|
||||
strcpy(pCol->colName, ((SColumnNode*)pNode)->colName);
|
||||
continue;
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pScanCols) {
|
||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) {
|
||||
SColumnNode* pCol = nodesListGetNode(pScanPhysiNode->pScanCols, 0);
|
||||
strcpy(pCol->tableAlias, ((SColumnNode*)pNode)->tableAlias);
|
||||
strcpy(pCol->colName, ((SColumnNode*)pNode)->colName);
|
||||
continue;
|
||||
}
|
||||
CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, nodesCloneNode(pNode)));
|
||||
}
|
||||
CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, nodesCloneNode(pNode)));
|
||||
} else {
|
||||
pScanPhysiNode->pScanCols = nodesCloneList(pScanCols);
|
||||
CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -260,13 +267,27 @@ static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* p
|
|||
return (SPhysiNode*)pTableScan;
|
||||
}
|
||||
|
||||
static SPhysiNode* createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
|
||||
SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
|
||||
CHECK_ALLOC(pScan, NULL);
|
||||
CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan), (SPhysiNode*)pScan);
|
||||
for (int32_t i = 0; i < pScanLogicNode->pVgroupList->numOfVgroups; ++i) {
|
||||
SQueryNodeAddr addr;
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups + i, &addr);
|
||||
taosArrayPush(pCxt->pExecNodeList, &addr);
|
||||
}
|
||||
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
|
||||
return (SPhysiNode*)pScan;
|
||||
}
|
||||
|
||||
static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) {
|
||||
switch (pScanLogicNode->scanType) {
|
||||
case SCAN_TYPE_TAG:
|
||||
return createTagScanPhysiNode(pCxt, pScanLogicNode);
|
||||
case SCAN_TYPE_TABLE:
|
||||
return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
|
||||
case SCAN_TYPE_STABLE:
|
||||
case SCAN_TYPE_SYSTEM_TABLE:
|
||||
return createSystemTableScanPhysiNode(pCxt, pScanLogicNode);
|
||||
case SCAN_TYPE_STREAM:
|
||||
break;
|
||||
default:
|
||||
|
@ -769,7 +790,7 @@ static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) {
|
|||
|
||||
static int32_t doBuildPhysiPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan, SSubplan* pParent, SQueryPlan* pQueryPlan) {
|
||||
SSubplan* pSubplan = createPhysiSubplan(pCxt, pLogicSubplan);
|
||||
CHECK_ALLOC(pSubplan, DEAL_RES_ERROR);
|
||||
CHECK_ALLOC(pSubplan, TSDB_CODE_OUT_OF_MEMORY);
|
||||
CHECK_CODE_EXT(pushSubplan(pCxt, pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans));
|
||||
++(pQueryPlan->numOfSubplans);
|
||||
if (NULL != pParent) {
|
||||
|
|
|
@ -56,7 +56,7 @@ protected:
|
|||
const string syntaxTreeStr = toString(query_->pRoot, false);
|
||||
|
||||
SLogicNode* pLogicPlan = nullptr;
|
||||
SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot };
|
||||
SPlanContext cxt = { .queryId = 1, .acctId = 0, .mgmtEpSet = {0}, .pAstRoot = query_->pRoot };
|
||||
code = createLogicPlan(&cxt, &pLogicPlan);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl;
|
||||
|
|
|
@ -172,7 +172,7 @@ static int32_t queryConvertTableMetaMsg(STableMetaRsp *pMetaMsg) {
|
|||
}
|
||||
|
||||
if (pMetaMsg->tableType != TSDB_SUPER_TABLE && pMetaMsg->tableType != TSDB_CHILD_TABLE &&
|
||||
pMetaMsg->tableType != TSDB_NORMAL_TABLE) {
|
||||
pMetaMsg->tableType != TSDB_NORMAL_TABLE && pMetaMsg->tableType != TSDB_SYSTEM_TABLE) {
|
||||
qError("invalid tableType[%d] in table meta rsp msg", pMetaMsg->tableType);
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ sql connect
|
|||
print =============== create database
|
||||
sql create database d1 vgroups 2
|
||||
sql show databases
|
||||
if $rows != 1 then
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -40,7 +40,7 @@ endi
|
|||
print =============== drop database
|
||||
sql drop database d1
|
||||
sql show databases
|
||||
if $rows != 0 then
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -49,7 +49,7 @@ sql create database d2 vgroups 2
|
|||
sql create database d3 vgroups 3
|
||||
sql create database d4 vgroups 4
|
||||
sql show databases
|
||||
if $rows != 3 then
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -111,7 +111,7 @@ print =============== drop database
|
|||
sql drop database d2
|
||||
sql drop database d3
|
||||
sql show databases
|
||||
if $rows != 1 then
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -154,7 +154,7 @@ system sh/exec.sh -n dnode1 -s start
|
|||
|
||||
print =============== show databases
|
||||
sql show databases
|
||||
if $rows != 1 then
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
|
@ -24,8 +24,8 @@ endi
|
|||
print ========== stop dnode2
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
|
||||
|
||||
print =============== create database
|
||||
sql_error create database d1 vgroups 4
|
||||
#print =============== create database
|
||||
#sql_error create database d1 vgroups 4
|
||||
|
||||
print ========== start dnode2
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
|
@ -66,6 +66,7 @@ sql_error drop database d1
|
|||
|
||||
print ========== start dnode2
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
sleep 1000
|
||||
|
||||
print =============== re-create database
|
||||
$x = 0
|
||||
|
|
Loading…
Reference in New Issue