feature[query]:refactor show query processing in mnode.
This commit is contained in:
parent
bece2cbd35
commit
208f639806
|
@ -99,6 +99,8 @@ extern const int32_t TYPE_BYTES[15];
|
||||||
#define TSDB_INS_TABLE_MNODES "mnodes"
|
#define TSDB_INS_TABLE_MNODES "mnodes"
|
||||||
#define TSDB_INS_TABLE_MODULES "modules"
|
#define TSDB_INS_TABLE_MODULES "modules"
|
||||||
#define TSDB_INS_TABLE_QNODES "qnodes"
|
#define TSDB_INS_TABLE_QNODES "qnodes"
|
||||||
|
#define TSDB_INS_TABLE_BNODES "bnodes"
|
||||||
|
#define TSDB_INS_TABLE_CLUSTER "cluster"
|
||||||
#define TSDB_INS_TABLE_USER_DATABASES "user_databases"
|
#define TSDB_INS_TABLE_USER_DATABASES "user_databases"
|
||||||
#define TSDB_INS_TABLE_USER_FUNCTIONS "user_functions"
|
#define TSDB_INS_TABLE_USER_FUNCTIONS "user_functions"
|
||||||
#define TSDB_INS_TABLE_USER_INDEXES "user_indexes"
|
#define TSDB_INS_TABLE_USER_INDEXES "user_indexes"
|
||||||
|
|
|
@ -69,7 +69,6 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t showId;
|
int64_t showId;
|
||||||
ShowMetaFp metaFps[TSDB_MGMT_TABLE_MAX];
|
|
||||||
ShowRetrieveFp retrieveFps[TSDB_MGMT_TABLE_MAX];
|
ShowRetrieveFp retrieveFps[TSDB_MGMT_TABLE_MAX];
|
||||||
ShowFreeIterFp freeIterFps[TSDB_MGMT_TABLE_MAX];
|
ShowFreeIterFp freeIterFps[TSDB_MGMT_TABLE_MAX];
|
||||||
SCacheObj *cache;
|
SCacheObj *cache;
|
||||||
|
|
|
@ -24,7 +24,6 @@ extern "C" {
|
||||||
|
|
||||||
int32_t mndInitShow(SMnode *pMnode);
|
int32_t mndInitShow(SMnode *pMnode);
|
||||||
void mndCleanupShow(SMnode *pMnode);
|
void mndCleanupShow(SMnode *pMnode);
|
||||||
void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp);
|
|
||||||
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp);
|
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp);
|
||||||
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType msgType, ShowFreeIterFp fp);
|
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType msgType, ShowFreeIterFp fp);
|
||||||
void mndVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow);
|
void mndVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow);
|
||||||
|
|
|
@ -51,7 +51,6 @@ int32_t mndInitBnode(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_BNODE_RSP, mndProcessCreateBnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_BNODE_RSP, mndProcessCreateBnodeRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_DROP_BNODE_RSP, mndProcessDropBnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_DROP_BNODE_RSP, mndProcessDropBnodeRsp);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndGetBnodeMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndRetrieveBnodes);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndRetrieveBnodes);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndCancelGetNextBnode);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndCancelGetNextBnode);
|
||||||
|
|
||||||
|
@ -440,46 +439,6 @@ static int32_t mndProcessDropBnodeRsp(SNodeMsg *pRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetBnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
|
||||||
SMnode *pMnode = pReq->pNode;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
|
|
||||||
int32_t cols = 0;
|
|
||||||
SSchema *pSchema = pMeta->pSchemas;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 2;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
|
||||||
strcpy(pSchema[cols].name, "id");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "endpoint");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 8;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
strcpy(pSchema[cols].name, "create_time");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pMeta->numOfColumns = cols;
|
|
||||||
pShow->numOfColumns = cols;
|
|
||||||
|
|
||||||
pShow->offset[0] = 0;
|
|
||||||
for (int32_t i = 1; i < cols; ++i) {
|
|
||||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_BNODE);
|
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
|
||||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndRetrieveBnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveBnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
|
@ -26,7 +26,6 @@ static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster);
|
||||||
static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster);
|
static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster);
|
||||||
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster);
|
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster);
|
||||||
static int32_t mndCreateDefaultCluster(SMnode *pMnode);
|
static int32_t mndCreateDefaultCluster(SMnode *pMnode);
|
||||||
static int32_t mndGetClusterMeta(SNodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
|
|
||||||
static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
|
@ -40,7 +39,6 @@ int32_t mndInitCluster(SMnode *pMnode) {
|
||||||
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
|
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
|
||||||
.deleteFp = (SdbDeleteFp)mndClusterActionDelete};
|
.deleteFp = (SdbDeleteFp)mndClusterActionDelete};
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndGetClusterMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
|
@ -180,44 +178,6 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
|
||||||
return sdbWrite(pMnode->pSdb, pRaw);
|
return sdbWrite(pMnode->pSdb, pRaw);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetClusterMeta(SNodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
|
|
||||||
int32_t cols = 0;
|
|
||||||
SSchema *pSchema = pMeta->pSchemas;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 8;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
|
|
||||||
strcpy(pSchema[cols].name, "id");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "name");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 8;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
strcpy(pSchema[cols].name, "create_time");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pMeta->numOfColumns = cols;
|
|
||||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
|
||||||
pShow->numOfColumns = cols;
|
|
||||||
|
|
||||||
pShow->offset[0] = 0;
|
|
||||||
for (int32_t i = 1; i < cols; ++i) {
|
|
||||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
pShow->numOfRows = 1;
|
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
|
||||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
SMnode *pMnode = pMsg->pNode;
|
SMnode *pMnode = pMsg->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
|
@ -35,7 +35,6 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||||
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
|
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
|
||||||
static int32_t mndProcessConsumerMetaMsg(SNodeMsg *pMsg);
|
static int32_t mndProcessConsumerMetaMsg(SNodeMsg *pMsg);
|
||||||
static int32_t mndGetConsumerMeta(SNodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
|
|
||||||
static int32_t mndRetrieveConsumer(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveConsumer(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,6 @@ static int32_t mndProcessDropDbReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessUseDbReq(SNodeMsg *pReq);
|
static int32_t mndProcessUseDbReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessSyncDbReq(SNodeMsg *pReq);
|
static int32_t mndProcessSyncDbReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
|
static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
|
||||||
static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
|
||||||
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
|
||||||
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq);
|
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq);
|
||||||
|
@ -59,7 +58,6 @@ int32_t mndInitDb(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DB, mndGetDbMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DB, mndCancelGetNextDb);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DB, mndCancelGetNextDb);
|
||||||
|
|
||||||
|
@ -1339,136 +1337,6 @@ SYNC_DB_OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
|
||||||
SMnode *pMnode = pReq->pNode;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
|
|
||||||
int32_t cols = 0;
|
|
||||||
SSchema *pSchema = pMeta->pSchemas;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "name");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 8;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
strcpy(pSchema[cols].name, "create_time");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 2;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
|
||||||
strcpy(pSchema[cols].name, "vgroups");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 4;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
|
||||||
strcpy(pSchema[cols].name, "ntables");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 2;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
|
||||||
strcpy(pSchema[cols].name, "replica");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 2;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
|
||||||
strcpy(pSchema[cols].name, "quorum");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 2;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
|
||||||
strcpy(pSchema[cols].name, "days");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 24 + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "keep0,keep1,keep2");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 4;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
|
||||||
strcpy(pSchema[cols].name, "cache");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 4;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
|
||||||
strcpy(pSchema[cols].name, "blocks");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 4;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
|
||||||
strcpy(pSchema[cols].name, "minrows");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 4;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
|
||||||
strcpy(pSchema[cols].name, "maxrows");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 1;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
|
|
||||||
strcpy(pSchema[cols].name, "wallevel");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 4;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
|
||||||
strcpy(pSchema[cols].name, "fsync");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 1;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
|
|
||||||
strcpy(pSchema[cols].name, "comp");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 1;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
|
|
||||||
strcpy(pSchema[cols].name, "cachelast");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 3 + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "precision");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
// pShow->bytes[cols] = 1;
|
|
||||||
// pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
|
|
||||||
// strcpy(pSchema[cols].name, "update");
|
|
||||||
// pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
// cols++;
|
|
||||||
|
|
||||||
pMeta->numOfColumns = cols;
|
|
||||||
pShow->numOfColumns = cols;
|
|
||||||
|
|
||||||
pShow->offset[0] = 0;
|
|
||||||
for (int32_t i = 1; i < cols; ++i) {
|
|
||||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_DB);
|
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
|
||||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
char *mnGetDbStr(char *src) {
|
char *mnGetDbStr(char *src) {
|
||||||
char *pos = strstr(src, TS_PATH_DELIMITER);
|
char *pos = strstr(src, TS_PATH_DELIMITER);
|
||||||
if (pos != NULL) ++pos;
|
if (pos != NULL) ++pos;
|
||||||
|
|
|
@ -58,7 +58,6 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq);
|
||||||
static int32_t mndGetConfigMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
static int32_t mndGetConfigMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||||
static int32_t mndRetrieveConfigs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveConfigs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
|
||||||
static int32_t mndGetDnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
|
||||||
static int32_t mndRetrieveDnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveDnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
|
@ -78,10 +77,8 @@ int32_t mndInitDnode(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndGetConfigMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndRetrieveConfigs);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndRetrieveConfigs);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndCancelGetNextConfig);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndCancelGetNextConfig);
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndGetDnodeMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
|
||||||
|
|
||||||
|
@ -710,70 +707,6 @@ static int32_t mndRetrieveConfigs(SNodeMsg *pReq, SShowObj *pShow, char *data, i
|
||||||
|
|
||||||
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
|
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
|
||||||
|
|
||||||
static int32_t mndGetDnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
|
||||||
SMnode *pMnode = pReq->pNode;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
|
|
||||||
int32_t cols = 0;
|
|
||||||
SSchema *pSchema = pMeta->pSchemas;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 2;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
|
||||||
strcpy(pSchema[cols].name, "id");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "endpoint");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 2;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
|
||||||
strcpy(pSchema[cols].name, "vnodes");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 2;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
|
||||||
strcpy(pSchema[cols].name, "support_vnodes");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "status");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 8;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
strcpy(pSchema[cols].name, "create_time");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 256 + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "offline_reason");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pMeta->numOfColumns = cols;
|
|
||||||
pShow->numOfColumns = cols;
|
|
||||||
|
|
||||||
pShow->offset[0] = 0;
|
|
||||||
for (int32_t i = 1; i < cols; ++i) {
|
|
||||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_DNODE);
|
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
|
||||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndRetrieveDnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveDnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
|
@ -51,7 +51,6 @@ int32_t mndInitFunc(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_FUNC, mndProcessDropFuncReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_FUNC, mndProcessDropFuncReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_FUNC, mndProcessRetrieveFuncReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_FUNC, mndProcessRetrieveFuncReq);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndGetFuncMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndRetrieveFuncs);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndRetrieveFuncs);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndCancelGetNextFunc);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndCancelGetNextFunc);
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,16 @@ static const SInfosTableSchema qnodesSchema[] = {
|
||||||
{.name = "endpoint", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "endpoint", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
};
|
};
|
||||||
|
static const SInfosTableSchema bnodesSchema[] = {
|
||||||
|
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
|
{.name = "endpoint", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
|
};
|
||||||
|
static const SInfosTableSchema clusterSchema[] = {
|
||||||
|
{.name = "id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
|
{.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
|
};
|
||||||
static const SInfosTableSchema userDBSchema[] = {
|
static const SInfosTableSchema userDBSchema[] = {
|
||||||
{.name = "name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
|
@ -183,6 +193,8 @@ static const SInfosTableMeta infosMeta[] = {
|
||||||
{TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)},
|
{TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)},
|
||||||
{TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)},
|
{TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)},
|
||||||
{TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)},
|
{TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)},
|
||||||
|
{TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)},
|
||||||
|
{TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema)},
|
||||||
{TSDB_INS_TABLE_USER_DATABASES, userDBSchema, tListLen(userDBSchema)},
|
{TSDB_INS_TABLE_USER_DATABASES, userDBSchema, tListLen(userDBSchema)},
|
||||||
{TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
|
{TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
|
||||||
{TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)},
|
{TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)},
|
||||||
|
|
|
@ -35,7 +35,6 @@ static int32_t mndProcessDropMnodeReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessCreateMnodeRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessCreateMnodeRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndProcessAlterMnodeRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessAlterMnodeRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndProcessDropMnodeRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessDropMnodeRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndGetMnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
|
||||||
static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
|
@ -55,7 +54,6 @@ int32_t mndInitMnode(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_ALTER_MNODE_RSP, mndProcessAlterMnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_ALTER_MNODE_RSP, mndProcessAlterMnodeRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndProcessDropMnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndProcessDropMnodeRsp);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndGetMnodeMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
|
||||||
|
|
||||||
|
@ -610,59 +608,6 @@ static int32_t mndProcessDropMnodeRsp(SNodeMsg *pRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetMnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
|
||||||
SMnode *pMnode = pReq->pNode;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
|
|
||||||
int32_t cols = 0;
|
|
||||||
SSchema *pSchema = pMeta->pSchemas;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 2;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
|
||||||
strcpy(pSchema[cols].name, "id");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "endpoint");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "role");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 8;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
strcpy(pSchema[cols].name, "create_time");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 8;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
strcpy(pSchema[cols].name, "role_time");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pMeta->numOfColumns = cols;
|
|
||||||
pShow->numOfColumns = cols;
|
|
||||||
|
|
||||||
pShow->offset[0] = 0;
|
|
||||||
for (int32_t i = 1; i < cols; ++i) {
|
|
||||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_MNODE);
|
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
|
||||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
|
||||||
|
|
||||||
mndUpdateMnodeRole(pMnode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
|
@ -77,10 +77,8 @@ int32_t mndInitProfile(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndGetQueryMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,6 @@ static int32_t mndProcessCreateQnodeReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessDropQnodeReq(SNodeMsg *pReq);
|
static int32_t mndProcessDropQnodeReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessCreateQnodeRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessCreateQnodeRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndProcessDropQnodeRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessDropQnodeRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndGetQnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
|
||||||
static int32_t mndRetrieveQnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveQnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter);
|
||||||
static int32_t mndProcessQnodeListReq(SNodeMsg *pReq);
|
static int32_t mndProcessQnodeListReq(SNodeMsg *pReq);
|
||||||
|
@ -53,7 +52,6 @@ int32_t mndInitQnode(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_QNODE_RSP, mndProcessCreateQnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_QNODE_RSP, mndProcessCreateQnodeRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_DROP_QNODE_RSP, mndProcessDropQnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_DROP_QNODE_RSP, mndProcessDropQnodeRsp);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndGetQnodeMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndRetrieveQnodes);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndRetrieveQnodes);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndCancelGetNextQnode);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndCancelGetNextQnode);
|
||||||
|
|
||||||
|
@ -503,46 +501,6 @@ static int32_t mndProcessDropQnodeRsp(SNodeMsg *pRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetQnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
|
||||||
SMnode *pMnode = pReq->pNode;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
|
|
||||||
int32_t cols = 0;
|
|
||||||
SSchema *pSchema = pMeta->pSchemas;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 2;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
|
||||||
strcpy(pSchema[cols].name, "id");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "endpoint");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 8;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
strcpy(pSchema[cols].name, "create_time");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pMeta->numOfColumns = cols;
|
|
||||||
pShow->numOfColumns = cols;
|
|
||||||
|
|
||||||
pShow->offset[0] = 0;
|
|
||||||
for (int32_t i = 1; i < cols; ++i) {
|
|
||||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_QNODE);
|
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
|
||||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndRetrieveQnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveQnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
|
@ -22,8 +22,6 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq);
|
||||||
static void mndFreeShowObj(SShowObj *pShow);
|
static void mndFreeShowObj(SShowObj *pShow);
|
||||||
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId);
|
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId);
|
||||||
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
|
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
|
||||||
static int32_t mndProcessShowReq(SNodeMsg *pReq);
|
|
||||||
static int32_t mndProcessRetrieveReq(SNodeMsg *pReq);
|
|
||||||
static bool mndCheckRetrieveFinished(SShowObj *pShow);
|
static bool mndCheckRetrieveFinished(SShowObj *pShow);
|
||||||
static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq);
|
static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq);
|
||||||
|
|
||||||
|
@ -37,8 +35,6 @@ int32_t mndInitShow(SMnode *pMnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_SHOW, mndProcessShowReq);
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_SHOW_RETRIEVE, mndProcessRetrieveReq);
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -117,67 +113,6 @@ static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
|
||||||
taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove);
|
taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessShowReq(SNodeMsg *pReq) {
|
|
||||||
SMnode *pMnode = pReq->pNode;
|
|
||||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
|
||||||
int32_t code = -1;
|
|
||||||
SShowReq showReq = {0};
|
|
||||||
SShowRsp showRsp = {0};
|
|
||||||
|
|
||||||
if (tDeserializeSShowReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &showReq) != 0) {
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto SHOW_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (showReq.type <= TSDB_MGMT_TABLE_START || showReq.type >= TSDB_MGMT_TABLE_MAX) {
|
|
||||||
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
|
|
||||||
goto SHOW_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
ShowMetaFp metaFp = pMgmt->metaFps[showReq.type];
|
|
||||||
if (metaFp == NULL) {
|
|
||||||
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
|
|
||||||
goto SHOW_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
SShowObj *pShow = mndCreateShowObj(pMnode, &showReq);
|
|
||||||
if (pShow == NULL) {
|
|
||||||
goto SHOW_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
showRsp.showId = pShow->id;
|
|
||||||
showRsp.tableMeta.pSchemas = taosMemoryCalloc(TSDB_MAX_COLUMNS, sizeof(SSchema));
|
|
||||||
if (showRsp.tableMeta.pSchemas == NULL) {
|
|
||||||
mndReleaseShowObj(pShow, true);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto SHOW_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = (*metaFp)(pReq, pShow, &showRsp.tableMeta);
|
|
||||||
mDebug("show:0x%" PRIx64 ", get meta finished, numOfRows:%d cols:%d showReq.type:%s, result:%s", pShow->id,
|
|
||||||
pShow->numOfRows, pShow->numOfColumns, mndShowStr(showReq.type), tstrerror(code));
|
|
||||||
|
|
||||||
if (code == 0) {
|
|
||||||
int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
|
|
||||||
void *pBuf = rpcMallocCont(bufLen);
|
|
||||||
tSerializeSShowRsp(pBuf, bufLen, &showRsp);
|
|
||||||
pReq->rspLen = bufLen;
|
|
||||||
pReq->pRsp = pBuf;
|
|
||||||
mndReleaseShowObj(pShow, false);
|
|
||||||
} else {
|
|
||||||
mndReleaseShowObj(pShow, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
SHOW_OVER:
|
|
||||||
if (code != 0) {
|
|
||||||
mError("failed to process show-meta req since %s", terrstr());
|
|
||||||
}
|
|
||||||
|
|
||||||
tFreeSShowReq(&showReq);
|
|
||||||
tFreeSShowRsp(&showRsp);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndProcessRetrieveReq(SNodeMsg *pReq) {
|
static int32_t mndProcessRetrieveReq(SNodeMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
||||||
|
@ -458,11 +393,6 @@ void mndVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capaci
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp) {
|
|
||||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
|
||||||
pMgmt->metaFps[showType] = fp;
|
|
||||||
}
|
|
||||||
|
|
||||||
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) {
|
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) {
|
||||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
||||||
pMgmt->retrieveFps[showType] = fp;
|
pMgmt->retrieveFps[showType] = fp;
|
||||||
|
|
|
@ -58,7 +58,6 @@ int32_t mndInitSma(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndProcessVCreateSmaRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndProcessVCreateSmaRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndProcessVDropSmaRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndProcessVDropSmaRsp);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndGetSmaMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma);
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
|
|
|
@ -33,7 +33,6 @@ static int32_t mndProcessCreateSnodeReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq);
|
static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessCreateSnodeRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessCreateSnodeRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndProcessDropSnodeRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessDropSnodeRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndGetSnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
|
||||||
static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
|
@ -51,7 +50,6 @@ int32_t mndInitSnode(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_SNODE_RSP, mndProcessCreateSnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_SNODE_RSP, mndProcessCreateSnodeRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_DROP_SNODE_RSP, mndProcessDropSnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_DROP_SNODE_RSP, mndProcessDropSnodeRsp);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndGetSnodeMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndRetrieveSnodes);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndRetrieveSnodes);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndCancelGetNextSnode);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndCancelGetNextSnode);
|
||||||
|
|
||||||
|
@ -452,46 +450,6 @@ static int32_t mndProcessDropSnodeRsp(SNodeMsg *pRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetSnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
|
||||||
SMnode *pMnode = pReq->pNode;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
|
|
||||||
int32_t cols = 0;
|
|
||||||
SSchema *pSchema = pMeta->pSchemas;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 2;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
|
||||||
strcpy(pSchema[cols].name, "id");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "endpoint");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 8;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
strcpy(pSchema[cols].name, "create_time");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pMeta->numOfColumns = cols;
|
|
||||||
pShow->numOfColumns = cols;
|
|
||||||
|
|
||||||
pShow->offset[0] = 0;
|
|
||||||
for (int32_t i = 1; i < cols; ++i) {
|
|
||||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_SNODE);
|
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
|
||||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
|
@ -39,7 +39,6 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndProcessVAlterStbRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessVAlterStbRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndProcessVDropStbRsp(SNodeMsg *pRsp);
|
static int32_t mndProcessVDropStbRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndProcessTableMetaReq(SNodeMsg *pReq);
|
static int32_t mndProcessTableMetaReq(SNodeMsg *pReq);
|
||||||
static int32_t mndGetStbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
|
||||||
static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
|
@ -60,7 +59,6 @@ int32_t mndInitStb(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STB, mndGetStbMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb);
|
||||||
|
|
||||||
|
@ -508,32 +506,32 @@ static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
|
for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
|
||||||
SField *pField = taosArrayGet(pCreate->pColumns, i);
|
SField *pField1 = taosArrayGet(pCreate->pColumns, i);
|
||||||
if (pField->type < 0) {
|
if (pField->type < 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pField->bytes <= 0) {
|
if (pField1->bytes <= 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pField->name[0] == 0) {
|
if (pField1->name[0] == 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pCreate->numOfTags; ++i) {
|
for (int32_t i = 0; i < pCreate->numOfTags; ++i) {
|
||||||
SField *pField = taosArrayGet(pCreate->pTags, i);
|
SField *pField1 = taosArrayGet(pCreate->pTags, i);
|
||||||
if (pField->type < 0) {
|
if (pField1->type < 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pField->bytes <= 0) {
|
if (pField1->bytes <= 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pField->name[0] == 0) {
|
if (pField1->name[0] == 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1315,7 +1313,6 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
int32_t contLen;
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
|
@ -1628,56 +1625,6 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetStbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
|
||||||
SMnode *pMnode = pReq->pNode;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
|
|
||||||
if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t cols = 0;
|
|
||||||
SSchema *pSchema = pMeta->pSchemas;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "name");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 8;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
strcpy(pSchema[cols].name, "create_time");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 4;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
|
||||||
strcpy(pSchema[cols].name, "columns");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 4;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
|
||||||
strcpy(pSchema[cols].name, "tags");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pMeta->numOfColumns = cols;
|
|
||||||
pShow->numOfColumns = cols;
|
|
||||||
|
|
||||||
pShow->offset[0] = 0;
|
|
||||||
for (int32_t i = 1; i < cols; ++i) {
|
|
||||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_STB);
|
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
|
||||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void mndExtractTableName(char *tableId, char *name) {
|
static void mndExtractTableName(char *tableId, char *name) {
|
||||||
int32_t pos = -1;
|
int32_t pos = -1;
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
|
|
|
@ -58,7 +58,6 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
|
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
|
||||||
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
|
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TP, mndGetStreamMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveStream);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveStream);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextStream);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextStream);
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,6 @@ int32_t mndInitTopic(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TP, mndGetTopicMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextTopic);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextTopic);
|
||||||
|
|
||||||
|
|
|
@ -74,7 +74,6 @@ int32_t mndInitTrans(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndGetTransMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndCancelGetNextTrans);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndCancelGetNextTrans);
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
|
|
|
@ -54,7 +54,6 @@ int32_t mndInitUser(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_USER, mndProcessDropUserReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_USER, mndProcessDropUserReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_GET_USER_AUTH, mndProcessGetUserAuthReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_GET_USER_AUTH, mndProcessGetUserAuthReq);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_USER, mndGetUserMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_USER, mndRetrieveUsers);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_USER, mndRetrieveUsers);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_USER, mndCancelGetNextUser);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_USER, mndCancelGetNextUser);
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
|
|
|
@ -38,7 +38,6 @@ static int32_t mndProcessCompactVnodeRsp(SNodeMsg *pRsp);
|
||||||
static int32_t mndGetVgroupMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
static int32_t mndGetVgroupMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||||
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
|
||||||
static int32_t mndGetVnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
|
||||||
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
|
@ -57,10 +56,8 @@ int32_t mndInitVgroup(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_SYNC_VNODE_RSP, mndProcessSyncVnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_SYNC_VNODE_RSP, mndProcessSyncVnodeRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_COMPACT_VNODE_RSP, mndProcessCompactVnodeRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_COMPACT_VNODE_RSP, mndProcessCompactVnodeRsp);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndGetVgroupMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndGetVnodeMeta);
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
|
||||||
|
|
||||||
|
@ -656,46 +653,6 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
|
||||||
return numOfVnodes;
|
return numOfVnodes;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetVnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
|
||||||
SMnode *pMnode = pReq->pNode;
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
|
|
||||||
int32_t cols = 0;
|
|
||||||
SSchema *pSchema = pMeta->pSchemas;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 4;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
|
||||||
strcpy(pSchema[cols].name, "vgId");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
|
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
|
||||||
strcpy(pSchema[cols].name, "status");
|
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pMeta->numOfColumns = cols;
|
|
||||||
pShow->numOfColumns = cols;
|
|
||||||
|
|
||||||
pShow->offset[0] = 0;
|
|
||||||
for (int32_t i = 1; i < cols; ++i) {
|
|
||||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t dnodeId = 0;
|
|
||||||
if (pShow->payloadLen > 0) {
|
|
||||||
dnodeId = atoi(pShow->payload);
|
|
||||||
}
|
|
||||||
|
|
||||||
pShow->replica = dnodeId;
|
|
||||||
pShow->numOfRows = mndGetVnodesNum(pMnode, dnodeId);
|
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
|
||||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->pNode;
|
SMnode *pMnode = pReq->pNode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
|
@ -184,8 +184,6 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
|
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
|
||||||
void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx,
|
|
||||||
int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
|
||||||
static bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
static bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn);
|
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn);
|
||||||
|
@ -674,36 +672,6 @@ static bool chkWindowOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo
|
||||||
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
|
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setResultOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid,
|
|
||||||
STimeWindow* win, bool masterscan, SResultRow** pResult, int64_t tableGroupId,
|
|
||||||
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) {
|
|
||||||
assert(win->skey <= win->ekey);
|
|
||||||
SDiskbasedBuf* pResultBuf = pRuntimeEnv->pResultBuf;
|
|
||||||
|
|
||||||
SResultRow* pResultRow = NULL;//doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char*)&win->skey, TSDB_KEYSIZE,
|
|
||||||
// masterscan, tableGroupId);
|
|
||||||
if (pResultRow == NULL) {
|
|
||||||
*pResult = NULL;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// not assign result buffer yet, add new result buffer
|
|
||||||
if (pResultRow->pageId == -1) {
|
|
||||||
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t)tableGroupId,
|
|
||||||
pRuntimeEnv->pQueryAttr->intermediateResultRowSize);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// set time window for current result
|
|
||||||
pResultRow->win = (*win);
|
|
||||||
*pResult = pResultRow;
|
|
||||||
setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
static void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
||||||
|
|
||||||
static int32_t setResultOutputBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_t id, STimeWindow* win, bool masterscan,
|
static int32_t setResultOutputBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_t id, STimeWindow* win, bool masterscan,
|
||||||
|
@ -1588,8 +1556,8 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
// null data, failed to allocate more memory buffer
|
// null data, failed to allocate more memory buffer
|
||||||
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult,
|
// ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult,
|
||||||
tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset);
|
// tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
@ -2779,14 +2747,10 @@ static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t num
|
||||||
}
|
}
|
||||||
|
|
||||||
void setTagValue(SOperatorInfo* pOperatorInfo, void* pTable, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
void setTagValue(SOperatorInfo* pOperatorInfo, void* pTable, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
|
|
||||||
|
|
||||||
SExprInfo* pExpr = pOperatorInfo->pExpr;
|
SExprInfo* pExpr = pOperatorInfo->pExpr;
|
||||||
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
|
||||||
|
|
||||||
SExprInfo* pExprInfo = &pExpr[0];
|
SExprInfo* pExprInfo = &pExpr[0];
|
||||||
int32_t functionId = getExprFunctionId(pExprInfo);
|
int32_t functionId = getExprFunctionId(pExprInfo);
|
||||||
|
#if 0
|
||||||
if (pQueryAttr->numOfOutput == 1 && functionId == FUNCTION_TS_COMP && pQueryAttr->stableQuery) {
|
if (pQueryAttr->numOfOutput == 1 && functionId == FUNCTION_TS_COMP && pQueryAttr->stableQuery) {
|
||||||
assert(pExprInfo->base.numOfParams == 1);
|
assert(pExprInfo->base.numOfParams == 1);
|
||||||
|
|
||||||
|
@ -2831,6 +2795,8 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void* pTable, SqlFunctionCtx* pCt
|
||||||
if (pRuntimeEnv->pTsBuf != NULL) {
|
if (pRuntimeEnv->pTsBuf != NULL) {
|
||||||
setCtxTagForJoin(pRuntimeEnv, &pCtx[0], pExprInfo, pTable);
|
setCtxTagForJoin(pRuntimeEnv, &pCtx[0], pExprInfo, pTable);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void copyToSDataBlock(SSDataBlock* pBlock, int32_t* offset, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pResBuf) {
|
void copyToSDataBlock(SSDataBlock* pBlock, int32_t* offset, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pResBuf) {
|
||||||
|
@ -3131,39 +3097,6 @@ void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
|
||||||
cleanupResultRowInfo(&pTableQueryInfo->resInfo);
|
cleanupResultRowInfo(&pTableQueryInfo->resInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx,
|
|
||||||
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
|
|
||||||
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
|
|
||||||
SFilePage* bufPage = getBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
|
|
||||||
|
|
||||||
int32_t offset = 0;
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
|
||||||
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
|
|
||||||
|
|
||||||
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
|
|
||||||
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
|
|
||||||
offset += pCtx[i].resDataInfo.bytes;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, bufPage, pResult->offset, offset);
|
|
||||||
offset += pCtx[i].resDataInfo.bytes;
|
|
||||||
|
|
||||||
int32_t functionId = pCtx[i].functionId;
|
|
||||||
if (functionId < 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
|
|
||||||
// if (i > 0) pCtx[i].pTsOutput = pCtx[i - 1].pOutput;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if (!pResInfo->initialized) {
|
|
||||||
// aAggs[functionId].init(&pCtx[i], pResInfo);
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) {
|
void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) {
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
|
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
|
||||||
|
|
Loading…
Reference in New Issue