[td-13039] fix bug in show.
This commit is contained in:
parent
9de235f3e4
commit
fbf664e273
|
@ -30,13 +30,13 @@ typedef int64_t tb_uid_t;
|
||||||
#define IS_TSWINDOW_SPECIFIED(win) (((win).skey != INT64_MIN) || ((win).ekey != INT64_MAX))
|
#define IS_TSWINDOW_SPECIFIED(win) (((win).skey != INT64_MIN) || ((win).ekey != INT64_MAX))
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_SUPER_TABLE = 1, // super table
|
TSDB_SUPER_TABLE = 1, // super table
|
||||||
TSDB_CHILD_TABLE = 2, // table created from super table
|
TSDB_CHILD_TABLE = 2, // table created from super table
|
||||||
TSDB_NORMAL_TABLE = 3, // ordinary table
|
TSDB_NORMAL_TABLE = 3, // ordinary table
|
||||||
TSDB_STREAM_TABLE = 4, // table created from stream computing
|
TSDB_STREAM_TABLE = 4, // table created from stream computing
|
||||||
TSDB_TEMP_TABLE = 5, // temp table created by nest query
|
TSDB_TEMP_TABLE = 5, // temp table created by nest query
|
||||||
TSDB_SYSTEM_TABLE = 6,
|
TSDB_SYSTEM_TABLE = 6,
|
||||||
TSDB_TABLE_MAX = 7
|
TSDB_TABLE_MAX = 7
|
||||||
} ETableType;
|
} ETableType;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
|
|
@ -872,6 +872,17 @@ typedef struct {
|
||||||
char data[];
|
char data[];
|
||||||
} SRetrieveTableRsp;
|
} SRetrieveTableRsp;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t handle;
|
||||||
|
int64_t useconds;
|
||||||
|
int8_t completed; // all results are returned to client
|
||||||
|
int8_t precision;
|
||||||
|
int8_t compressed;
|
||||||
|
int32_t compLen;
|
||||||
|
int32_t numOfRows;
|
||||||
|
char data[];
|
||||||
|
} SRetrieveMetaTableRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
|
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
|
||||||
int32_t port;
|
int32_t port;
|
||||||
|
|
|
@ -1732,6 +1732,7 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq
|
||||||
|
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->showId) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->showId) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->type) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->free) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->free) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -1746,6 +1747,7 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR
|
||||||
|
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->type) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->free) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->free) < 0) return -1;
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
|
@ -102,6 +102,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeWriteMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
|
||||||
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYSTABLE_RETRIEVE)] = dndProcessMnodeReadMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = dndProcessMnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = dndProcessMnodeWriteMsg;
|
||||||
|
|
|
@ -1324,123 +1324,133 @@ char *mnGetDbStr(char *src) {
|
||||||
return pos;
|
return pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
static char* getDataPosition(char* pData, SShowObj* pShow, int32_t cols, int32_t rows, int32_t capacityOfRow) {
|
||||||
|
return pData + pShow->offset[cols] * capacityOfRow + pShow->bytes[cols] * rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_t rows, int32_t rowCapacity) {
|
||||||
|
int32_t cols = 0;
|
||||||
|
|
||||||
|
char* pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
char *name = mnGetDbStr(pDb->name);
|
||||||
|
if (name != NULL) {
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
|
||||||
|
} else {
|
||||||
|
STR_TO_VARSTR(pWrite, "NULL");
|
||||||
|
}
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int64_t *)pWrite = pDb->createdTime;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int16_t *)pWrite = pDb->cfg.numOfVgroups;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int64_t *)pWrite = 0; // todo: num of Tables
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int16_t *)pWrite = pDb->cfg.replications;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int16_t *)pWrite = pDb->cfg.quorum;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int16_t *)pWrite = pDb->cfg.daysPerFile;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
char tmp[128] = {0};
|
||||||
|
if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) {
|
||||||
|
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0);
|
||||||
|
} else {
|
||||||
|
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2);
|
||||||
|
}
|
||||||
|
STR_WITH_SIZE_TO_VARSTR(pWrite, tmp, strlen(tmp));
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int32_t *)pWrite = pDb->cfg.cacheBlockSize;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int32_t *)pWrite = pDb->cfg.totalBlocks;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int32_t *)pWrite = pDb->cfg.minRows;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int32_t *)pWrite = pDb->cfg.maxRows;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int8_t *)pWrite = pDb->cfg.walLevel;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int32_t *)pWrite = pDb->cfg.fsyncPeriod;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int8_t *)pWrite = pDb->cfg.compression;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int8_t *)pWrite = pDb->cfg.cacheLastRow;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
char *prec = NULL;
|
||||||
|
switch (pDb->cfg.precision) {
|
||||||
|
case TSDB_TIME_PRECISION_MILLI:
|
||||||
|
prec = TSDB_TIME_PRECISION_MILLI_STR;
|
||||||
|
break;
|
||||||
|
case TSDB_TIME_PRECISION_MICRO:
|
||||||
|
prec = TSDB_TIME_PRECISION_MICRO_STR;
|
||||||
|
break;
|
||||||
|
case TSDB_TIME_PRECISION_NANO:
|
||||||
|
prec = TSDB_TIME_PRECISION_NANO_STR;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
prec = "none";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
|
||||||
|
*(int8_t *)pWrite = pDb->cfg.update;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rowsCapacity) {
|
||||||
SMnode *pMnode = pReq->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
char *pWrite;
|
|
||||||
int32_t cols = 0;
|
|
||||||
|
|
||||||
while (numOfRows < rows) {
|
while (numOfRows < rowsCapacity) {
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_DB, pShow->pIter, (void **)&pDb);
|
pShow->pIter = sdbFetch(pSdb, SDB_DB, pShow->pIter, (void **)&pDb);
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) {
|
||||||
|
break;
|
||||||
cols = 0;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
char *name = mnGetDbStr(pDb->name);
|
|
||||||
if (name != NULL) {
|
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
|
|
||||||
} else {
|
|
||||||
STR_TO_VARSTR(pWrite, "NULL");
|
|
||||||
}
|
}
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
dumpDbInfoToPayload(data, pDb, pShow, numOfRows, rowsCapacity);
|
||||||
*(int64_t *)pWrite = pDb->createdTime;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int16_t *)pWrite = pDb->cfg.numOfVgroups;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int16_t *)pWrite = 0; // todo
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int16_t *)pWrite = pDb->cfg.replications;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int16_t *)pWrite = pDb->cfg.quorum;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int16_t *)pWrite = pDb->cfg.daysPerFile;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
char tmp[128] = {0};
|
|
||||||
if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) {
|
|
||||||
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0);
|
|
||||||
} else {
|
|
||||||
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2);
|
|
||||||
}
|
|
||||||
STR_WITH_SIZE_TO_VARSTR(pWrite, tmp, strlen(tmp));
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int32_t *)pWrite = pDb->cfg.cacheBlockSize;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int32_t *)pWrite = pDb->cfg.totalBlocks;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int32_t *)pWrite = pDb->cfg.minRows;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int32_t *)pWrite = pDb->cfg.maxRows;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int8_t *)pWrite = pDb->cfg.walLevel;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int32_t *)pWrite = pDb->cfg.fsyncPeriod;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int8_t *)pWrite = pDb->cfg.compression;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int8_t *)pWrite = pDb->cfg.cacheLastRow;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
char *prec = NULL;
|
|
||||||
switch (pDb->cfg.precision) {
|
|
||||||
case TSDB_TIME_PRECISION_MILLI:
|
|
||||||
prec = TSDB_TIME_PRECISION_MILLI_STR;
|
|
||||||
break;
|
|
||||||
case TSDB_TIME_PRECISION_MICRO:
|
|
||||||
prec = TSDB_TIME_PRECISION_MICRO_STR;
|
|
||||||
break;
|
|
||||||
case TSDB_TIME_PRECISION_NANO:
|
|
||||||
prec = TSDB_TIME_PRECISION_NANO_STR;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
prec = "none";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(int8_t *)pWrite = pDb->cfg.update;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pDb);
|
sdbRelease(pSdb, pDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
|
// Append the information_schema database into the result.
|
||||||
|
|
||||||
|
|
||||||
|
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rowsCapacity, pShow);
|
||||||
pShow->numOfReads += numOfRows;
|
pShow->numOfReads += numOfRows;
|
||||||
|
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
|
|
|
@ -39,24 +39,24 @@ static const SInfosTableSchema qnodesSchema[] = {{.name = "id", .byt
|
||||||
{.name = "end_point", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "end_point", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
};
|
};
|
||||||
static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "created_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "created_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
{.name = "ntables", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "vgroups", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||||
{.name = "vgroups", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "ntables", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
{.name = "replica", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "replica", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||||
{.name = "quorum", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "quorum", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||||
{.name = "days", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "days", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
|
||||||
{.name = "keep", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "keep", .bytes = 24 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "cache", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "cache", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "blocks", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "blocks", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "minrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "minrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "maxrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "maxrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "wallevel", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "wallevel", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||||
{.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "comp", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||||
{.name = "cachelast", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||||
{.name = "precision", .bytes = 2, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "precision", .bytes = 3 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
|
||||||
};
|
};
|
||||||
static const SInfosTableSchema userFuncSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
static const SInfosTableSchema userFuncSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "created_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "created_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
|
|
|
@ -284,6 +284,20 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) {
|
||||||
strncpy(req.db, retrieveReq.db, tListLen(req.db));
|
strncpy(req.db, retrieveReq.db, tListLen(req.db));
|
||||||
|
|
||||||
pShow = mndCreateShowObj(pMnode, &req);
|
pShow = mndCreateShowObj(pMnode, &req);
|
||||||
|
STableMetaRsp *meta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, TSDB_INS_TABLE_USER_DATABASES, strlen(TSDB_INS_TABLE_USER_DATABASES));
|
||||||
|
pShow->numOfRows = 100;
|
||||||
|
|
||||||
|
int32_t offset = 0;
|
||||||
|
for(int32_t i = 0; i < meta->numOfColumns; ++i) {
|
||||||
|
pShow->numOfColumns = meta->numOfColumns;
|
||||||
|
pShow->offset[i] = offset;
|
||||||
|
|
||||||
|
int32_t bytes = meta->pSchemas[i].bytes;
|
||||||
|
pShow->rowSize += bytes;
|
||||||
|
pShow->bytes[i] = bytes;
|
||||||
|
offset += bytes;
|
||||||
|
}
|
||||||
|
|
||||||
if (pShow == NULL) {
|
if (pShow == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("failed to process show-meta req since %s", terrstr());
|
mError("failed to process show-meta req since %s", terrstr());
|
||||||
|
@ -330,7 +344,7 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) {
|
||||||
size = pShow->rowSize * rowsToRead;
|
size = pShow->rowSize * rowsToRead;
|
||||||
|
|
||||||
size += SHOW_STEP_SIZE;
|
size += SHOW_STEP_SIZE;
|
||||||
SRetrieveTableRsp *pRsp = rpcMallocCont(size);
|
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
mndReleaseShowObj((SShowObj*) pShow, false);
|
mndReleaseShowObj((SShowObj*) pShow, false);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -338,6 +352,8 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pRsp->handle = htobe64(pShow->id);
|
||||||
|
|
||||||
// if free flag is set, client wants to clean the resources
|
// if free flag is set, client wants to clean the resources
|
||||||
if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||||
rowsRead = (*retrieveFp)(pReq, (SShowObj*) pShow, pRsp->data, rowsToRead);
|
rowsRead = (*retrieveFp)(pReq, (SShowObj*) pShow, pRsp->data, rowsToRead);
|
||||||
|
|
|
@ -391,6 +391,12 @@ typedef struct SSourceDataInfo {
|
||||||
int32_t status;
|
int32_t status;
|
||||||
} SSourceDataInfo;
|
} SSourceDataInfo;
|
||||||
|
|
||||||
|
typedef struct SLoadRemoteDataInfo {
|
||||||
|
uint64_t totalSize; // total load bytes from remote
|
||||||
|
uint64_t totalRows; // total number of rows
|
||||||
|
uint64_t totalElapsed; // total elapsed time
|
||||||
|
} SLoadRemoteDataInfo;
|
||||||
|
|
||||||
typedef struct SExchangeInfo {
|
typedef struct SExchangeInfo {
|
||||||
SArray* pSources;
|
SArray* pSources;
|
||||||
SArray* pSourceDataInfo;
|
SArray* pSourceDataInfo;
|
||||||
|
@ -399,9 +405,7 @@ typedef struct SExchangeInfo {
|
||||||
SSDataBlock* pResult;
|
SSDataBlock* pResult;
|
||||||
bool seqLoadData; // sequential load data or not, false by default
|
bool seqLoadData; // sequential load data or not, false by default
|
||||||
int32_t current;
|
int32_t current;
|
||||||
uint64_t totalSize; // total load bytes from remote
|
SLoadRemoteDataInfo loadInfo;
|
||||||
uint64_t totalRows; // total number of rows
|
|
||||||
uint64_t totalElapsed; // total elapsed time
|
|
||||||
} SExchangeInfo;
|
} SExchangeInfo;
|
||||||
|
|
||||||
typedef struct STableScanInfo {
|
typedef struct STableScanInfo {
|
||||||
|
@ -440,14 +444,23 @@ typedef struct SStreamBlockScanInfo {
|
||||||
void* readerHandle; // stream block reader handle
|
void* readerHandle; // stream block reader handle
|
||||||
} SStreamBlockScanInfo;
|
} SStreamBlockScanInfo;
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct SSysScanResInfo {
|
||||||
|
struct SSysTableScanInfo *pSysScanInfo;
|
||||||
|
SRetrieveTableRsp *pRsp;
|
||||||
|
uint64_t totalRows;
|
||||||
|
} SSysScanResInfo;
|
||||||
|
|
||||||
typedef struct SSysTableScanInfo {
|
typedef struct SSysTableScanInfo {
|
||||||
union {
|
union {
|
||||||
void* pTransporter;
|
void* pTransporter;
|
||||||
void* readHandle;
|
void* readHandle;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
SRetrieveMetaTableRsp *pRsp;
|
||||||
|
|
||||||
void *pCur; // cursor
|
void *pCur; // cursor
|
||||||
SRetrieveTableReq* pReq;
|
SRetrieveTableReq req;
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
int32_t type; // show type
|
int32_t type; // show type
|
||||||
tsem_t ready;
|
tsem_t ready;
|
||||||
|
@ -457,8 +470,7 @@ typedef struct SSysTableScanInfo {
|
||||||
int32_t capacity;
|
int32_t capacity;
|
||||||
int64_t numOfBlocks; // extract basic running information.
|
int64_t numOfBlocks; // extract basic running information.
|
||||||
int64_t totalRows;
|
int64_t totalRows;
|
||||||
int64_t elapsedTime;
|
SLoadRemoteDataInfo loadInfo;
|
||||||
int64_t totalBytes;
|
|
||||||
} SSysTableScanInfo;
|
} SSysTableScanInfo;
|
||||||
|
|
||||||
typedef struct SOptrBasicInfo {
|
typedef struct SOptrBasicInfo {
|
||||||
|
@ -639,8 +651,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
|
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, int32_t tableType, SEpSet epset,
|
||||||
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo);
|
SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
|
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
|
@ -4951,34 +4951,37 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SExchangeInfo *pExchangeInfo, SSourceDataInfo* pDataInfo, int32_t numOfOutput, int64_t startTs) {
|
static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen,
|
||||||
char* pData = pDataInfo->pRsp->data;
|
int32_t numOfOutput, int64_t startTs, uint64_t* total) {
|
||||||
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
|
// char* pData = pRsp->data;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
|
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
|
||||||
|
|
||||||
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pRsp->numOfRows);
|
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * numOfRows);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t len = pRsp->numOfRows * pColInfoData->info.bytes;
|
size_t len = numOfRows * pColInfoData->info.bytes;
|
||||||
memcpy(tmp, pData, len);
|
memcpy(tmp, pData, len);
|
||||||
|
|
||||||
pColInfoData->pData = tmp;
|
pColInfoData->pData = tmp;
|
||||||
pData += len;
|
pData += len;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRes->info.rows = pRsp->numOfRows;
|
pRes->info.rows = numOfRows;
|
||||||
|
|
||||||
int64_t el = taosGetTimestampUs() - startTs;
|
int64_t el = taosGetTimestampUs() - startTs;
|
||||||
|
|
||||||
pExchangeInfo->totalRows += pRsp->numOfRows;
|
pLoadInfo->totalRows += numOfRows;
|
||||||
pExchangeInfo->totalSize += pRsp->compLen;
|
pLoadInfo->totalSize += compLen;
|
||||||
pDataInfo->totalRows += pRsp->numOfRows;
|
|
||||||
|
|
||||||
pExchangeInfo->totalElapsed += el;
|
if (total != NULL) {
|
||||||
|
*total += numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
pLoadInfo->totalElapsed += el;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -4988,11 +4991,12 @@ static void* setAllSourcesCompleted(SOperatorInfo *pOperator, int64_t startTs) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
int64_t el = taosGetTimestampUs() - startTs;
|
int64_t el = taosGetTimestampUs() - startTs;
|
||||||
pExchangeInfo->totalElapsed += el;
|
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||||
|
pLoadInfo->totalElapsed += el;
|
||||||
|
|
||||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||||
qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
|
qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
|
||||||
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
|
pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0);
|
||||||
|
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -5021,17 +5025,19 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
|
||||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
|
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
|
||||||
|
|
||||||
SSDataBlock* pRes = pExchangeInfo->pResult;
|
SSDataBlock* pRes = pExchangeInfo->pResult;
|
||||||
|
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||||
if (pRsp->numOfRows == 0) {
|
if (pRsp->numOfRows == 0) {
|
||||||
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
|
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows,
|
||||||
pExchangeInfo->totalRows);
|
pExchangeInfo->loadInfo.totalRows);
|
||||||
pDataInfo->status = DATA_EXHAUSTED;
|
pDataInfo->status = DATA_EXHAUSTED;
|
||||||
completed += 1;
|
completed += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs);
|
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
|
||||||
|
code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows,
|
||||||
|
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -5040,13 +5046,13 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
|
||||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows,
|
||||||
pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, i + 1,
|
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1,
|
||||||
totalSources);
|
totalSources);
|
||||||
pDataInfo->status = DATA_EXHAUSTED;
|
pDataInfo->status = DATA_EXHAUSTED;
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows,
|
||||||
pExchangeInfo->totalSize);
|
pLoadInfo->totalSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDataInfo->status != DATA_EXHAUSTED) {
|
if (pDataInfo->status != DATA_EXHAUSTED) {
|
||||||
|
@ -5118,10 +5124,12 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
|
||||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
|
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
|
||||||
|
|
||||||
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
|
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
|
||||||
|
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||||
|
|
||||||
if (pRsp->numOfRows == 0) {
|
if (pRsp->numOfRows == 0) {
|
||||||
qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next",
|
qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next",
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
|
||||||
pDataInfo->totalRows, pExchangeInfo->totalRows);
|
pDataInfo->totalRows, pLoadInfo->totalRows);
|
||||||
|
|
||||||
pDataInfo->status = DATA_EXHAUSTED;
|
pDataInfo->status = DATA_EXHAUSTED;
|
||||||
pExchangeInfo->current += 1;
|
pExchangeInfo->current += 1;
|
||||||
|
@ -5129,20 +5137,22 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pRes = pExchangeInfo->pResult;
|
SSDataBlock* pRes = pExchangeInfo->pResult;
|
||||||
setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs);
|
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
|
||||||
|
int32_t code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows,
|
||||||
|
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows);
|
||||||
|
|
||||||
if (pRsp->completed == 1) {
|
if (pRsp->completed == 1) {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
|
||||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows,
|
||||||
pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1,
|
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
|
||||||
totalSources);
|
totalSources);
|
||||||
|
|
||||||
pDataInfo->status = DATA_EXHAUSTED;
|
pDataInfo->status = DATA_EXHAUSTED;
|
||||||
pExchangeInfo->current += 1;
|
pExchangeInfo->current += 1;
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->totalSize);
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows, pLoadInfo->totalSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pExchangeInfo->pResult;
|
return pExchangeInfo->pResult;
|
||||||
|
@ -5156,10 +5166,11 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
||||||
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||||
|
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
|
||||||
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
|
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources,
|
||||||
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
|
pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5405,18 +5416,16 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock*
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*) param;
|
SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*) param;
|
||||||
pSourceDataInfo->pRsp = pMsg->pData;
|
pScanResInfo->pRsp = pMsg->pData;
|
||||||
|
|
||||||
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
|
SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp;
|
||||||
pRsp->numOfRows = htonl(pRsp->numOfRows);
|
pRsp->numOfRows = htonl(pRsp->numOfRows);
|
||||||
pRsp->useconds = htobe64(pRsp->useconds);
|
pRsp->useconds = htobe64(pRsp->useconds);
|
||||||
|
pRsp->handle = htobe64(pRsp->handle);
|
||||||
pRsp->compLen = htonl(pRsp->compLen);
|
pRsp->compLen = htonl(pRsp->compLen);
|
||||||
|
tsem_post(&pScanResInfo->ready);
|
||||||
pSourceDataInfo->status = DATA_READY;
|
|
||||||
tsem_post(&pSourceDataInfo->pEx->ready);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
|
static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
|
||||||
|
@ -5450,15 +5459,12 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
|
||||||
// pInfo->totalBytes;
|
// pInfo->totalBytes;
|
||||||
return (pInfo->pRes->info.rows == 0)? NULL:pInfo->pRes;
|
return (pInfo->pRes->info.rows == 0)? NULL:pInfo->pRes;
|
||||||
} else { // load the meta from mnode of the given epset
|
} else { // load the meta from mnode of the given epset
|
||||||
if (pInfo->pReq == NULL) {
|
int64_t startTs = taosGetTimestampUs();
|
||||||
pInfo->pReq = calloc(1, sizeof(SRetrieveTableReq));
|
|
||||||
if (pInfo->pReq == NULL) {
|
|
||||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->pReq->type = pInfo->type;
|
pInfo->req.type = pInfo->type;
|
||||||
}
|
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
|
||||||
|
char* buf1 = calloc(1, contLen);
|
||||||
|
tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
|
||||||
|
|
||||||
// send the fetch remote task result reques
|
// send the fetch remote task result reques
|
||||||
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
|
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
|
||||||
|
@ -5468,24 +5474,40 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMsgSendInfo->param = NULL;
|
pMsgSendInfo->param = pInfo;
|
||||||
pMsgSendInfo->msgInfo.pData = pInfo->pReq;
|
pMsgSendInfo->msgInfo.pData = buf1;
|
||||||
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq);
|
pMsgSendInfo->msgInfo.len = contLen;
|
||||||
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
|
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
|
||||||
pMsgSendInfo->fp = loadRemoteDataCallback;
|
pMsgSendInfo->fp = loadSysTableContentCb;
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
|
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
|
||||||
|
|
||||||
tsem_wait(&pInfo->ready);
|
tsem_wait(&pInfo->ready);
|
||||||
// handle the response and return to the caller
|
|
||||||
|
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
|
||||||
|
pInfo->req.showId = pRsp->handle;
|
||||||
|
|
||||||
|
if (pRsp->numOfRows == 0) {
|
||||||
|
// qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next",
|
||||||
|
// GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
|
||||||
|
// pDataInfo->totalRows, pExchangeInfo->totalRows);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
|
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
|
||||||
|
setSDataBlockFromFetchRsp(pRes, &pInfo->loadInfo, pTableRsp->numOfRows,
|
||||||
|
pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL);
|
||||||
|
|
||||||
|
return pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema,
|
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, int32_t tableType,
|
||||||
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo) {
|
SEpSet epset, SExecTaskInfo* pTaskInfo) {
|
||||||
SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo));
|
SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo));
|
||||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -5495,7 +5517,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: create the schema of result data block
|
pInfo->pRes = pResBlock;
|
||||||
pInfo->capacity = 4096;
|
pInfo->capacity = 4096;
|
||||||
pInfo->type = tableType;
|
pInfo->type = tableType;
|
||||||
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) {
|
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) {
|
||||||
|
@ -5512,11 +5534,34 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S
|
||||||
pOperator->blockingOptr = false;
|
pOperator->blockingOptr = false;
|
||||||
pOperator->status = OP_IN_EXECUTING;
|
pOperator->status = OP_IN_EXECUTING;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
|
pOperator->numOfOutput = pResBlock->info.numOfCols;
|
||||||
pOperator->nextDataFn = doSysTableScan;
|
pOperator->nextDataFn = doSysTableScan;
|
||||||
pOperator->closeFn = destroySysTableScannerOperatorInfo;
|
pOperator->closeFn = destroySysTableScannerOperatorInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
{ // todo refactor
|
||||||
|
SRpcInit rpcInit;
|
||||||
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
|
rpcInit.localPort = 0;
|
||||||
|
rpcInit.label = "DB-META";
|
||||||
|
rpcInit.numOfThreads = 1;
|
||||||
|
rpcInit.cfp = qProcessFetchRsp;
|
||||||
|
rpcInit.sessions = tsMaxConnections;
|
||||||
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
rpcInit.user = (char *)"root";
|
||||||
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
|
rpcInit.ckey = "key";
|
||||||
|
rpcInit.spi = 1;
|
||||||
|
rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6";
|
||||||
|
|
||||||
|
pInfo->pTransporter = rpcOpen(&rpcInit);
|
||||||
|
if (pInfo->pTransporter == NULL) {
|
||||||
|
return NULL; // todo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8125,6 +8170,14 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
|
||||||
|
|
||||||
taosArrayDestroy(tableIdList);
|
taosArrayDestroy(tableIdList);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) {
|
||||||
|
SSystemTableScanPhysiNode * pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
||||||
|
SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
|
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(NULL, pResBlock, TSDB_MGMT_TABLE_DB, pSysScanPhyNode->mgmtEpSet, pTaskInfo);
|
||||||
|
return pOperator;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue