Merge pull request #11322 from taosdata/feature/3.0_liaohj

<fix>[query]: fix filter caused taosd crash.td-14566.
This commit is contained in:
Haojun Liao 2022-04-09 22:52:32 +08:00 committed by GitHub
commit 9d7d12b397
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 258 additions and 1102 deletions

View File

@ -65,13 +65,14 @@ typedef struct SDataBlockInfo {
STimeWindow window;
int32_t rows;
int32_t rowSize;
int16_t numOfCols;
int16_t hasVarCol;
union {
int64_t uid;
int64_t uid; // from which table of uid, comes from this data block
int64_t blockId;
};
uint64_t groupId; // no need to serialize
int16_t numOfCols;
int16_t hasVarCol;
int16_t capacity;
} SDataBlockInfo;
typedef struct SSDataBlock {

View File

@ -197,7 +197,10 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock);

View File

@ -99,6 +99,8 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_INS_TABLE_MNODES "mnodes"
#define TSDB_INS_TABLE_MODULES "modules"
#define TSDB_INS_TABLE_QNODES "qnodes"
#define TSDB_INS_TABLE_BNODES "bnodes"
#define TSDB_INS_TABLE_CLUSTER "cluster"
#define TSDB_INS_TABLE_USER_DATABASES "user_databases"
#define TSDB_INS_TABLE_USER_FUNCTIONS "user_functions"
#define TSDB_INS_TABLE_USER_INDEXES "user_indexes"

View File

@ -84,6 +84,14 @@ int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRo
}
}
int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
} else {
return pColumnInfoData->info.bytes * numOfRows + BitmapLen(numOfRows);
}
}
void colDataTrim(SColumnInfoData* pColumnInfoData) {
// TODO
}
@ -353,13 +361,7 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
total += colDataGetLength(pColInfoData, pBlock->info.rows);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
total += sizeof(int32_t) * pBlock->info.rows;
} else {
total += BitmapLen(pBlock->info.rows);
}
total += colDataGetFullLength(pColInfoData, pBlock->info.rows);
}
return total;
@ -656,10 +658,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
return rowSize;
}
int32_t getAllowedRowsForPage(const SSDataBlock* pBlock, size_t pgSize) {
return (int32_t) ((pgSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock));
}
typedef struct SSDataBlockSortHelper {
SArray* orderInfo; // SArray<SBlockOrderInfo>
SSDataBlock* pDataBlock;
@ -1071,15 +1069,9 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
void blockDataCleanup(SSDataBlock* pDataBlock) {
pDataBlock->info.rows = 0;
if (pDataBlock->info.hasVarCol) {
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(p->info.type)) {
p->varmeta.length = 0;
}
}
colInfoDataCleanup(p, pDataBlock->info.capacity);
}
}
@ -1120,12 +1112,22 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows)
return TSDB_CODE_SUCCESS;
}
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
pColumn->varmeta.length = 0;
} else {
memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
}
}
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
int32_t code = 0;
if (numOfRows == 0) {
return TSDB_CODE_SUCCESS;
}
pDataBlock->info.capacity = numOfRows;
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
code = colInfoDataEnsureCapacity(p, numOfRows);

View File

@ -69,7 +69,6 @@ typedef struct {
typedef struct {
int64_t showId;
ShowMetaFp metaFps[TSDB_MGMT_TABLE_MAX];
ShowRetrieveFp retrieveFps[TSDB_MGMT_TABLE_MAX];
ShowFreeIterFp freeIterFps[TSDB_MGMT_TABLE_MAX];
SCacheObj *cache;

View File

@ -24,7 +24,6 @@ extern "C" {
int32_t mndInitShow(SMnode *pMnode);
void mndCleanupShow(SMnode *pMnode);
void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp);
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp);
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType msgType, ShowFreeIterFp fp);
void mndVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow);

View File

@ -51,7 +51,6 @@ int32_t mndInitBnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_BNODE_RSP, mndProcessCreateBnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_BNODE_RSP, mndProcessDropBnodeRsp);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndGetBnodeMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndRetrieveBnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndCancelGetNextBnode);
@ -439,46 +438,6 @@ static int32_t mndProcessDropBnodeRsp(SNodeMsg *pRsp) {
return 0;
}
static int32_t mndGetBnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "endpoint");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_BNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
static int32_t mndRetrieveBnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -26,7 +26,6 @@ static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster);
static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster);
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster);
static int32_t mndCreateDefaultCluster(SMnode *pMnode);
static int32_t mndGetClusterMeta(SNodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
@ -40,7 +39,6 @@ int32_t mndInitCluster(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete};
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndGetClusterMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
return sdbSetTable(pMnode->pSdb, table);
@ -180,44 +178,6 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
return sdbWrite(pMnode->pSdb, pRaw);
}
static int32_t mndGetClusterMeta(SNodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = cols;
strcpy(pMeta->tbName, mndShowStr(pShow->type));
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = 1;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pNode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -35,7 +35,6 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
static int32_t mndProcessConsumerMetaMsg(SNodeMsg *pMsg);
static int32_t mndGetConsumerMeta(SNodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveConsumer(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);

View File

@ -38,7 +38,6 @@ static int32_t mndProcessDropDbReq(SNodeMsg *pReq);
static int32_t mndProcessUseDbReq(SNodeMsg *pReq);
static int32_t mndProcessSyncDbReq(SNodeMsg *pReq);
static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq);
@ -62,7 +61,6 @@ int32_t mndInitDb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetIndexReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DB, mndGetDbMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DB, mndCancelGetNextDb);
@ -1342,136 +1340,6 @@ SYNC_DB_OVER:
return code;
}
static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "vgroups");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "ntables");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "replica");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "quorum");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "days");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 24 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "keep0,keep1,keep2");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "cache");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "blocks");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "minrows");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "maxrows");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema[cols].name, "wallevel");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "fsync");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema[cols].name, "comp");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema[cols].name, "cachelast");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 3 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "precision");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
// pShow->bytes[cols] = 1;
// pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
// strcpy(pSchema[cols].name, "update");
// pSchema[cols].bytes = pShow->bytes[cols];
// cols++;
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_DB);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
char *mnGetDbStr(char *src) {
char *pos = strstr(src, TS_PATH_DELIMITER);
if (pos != NULL) ++pos;

View File

@ -58,7 +58,6 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq);
static int32_t mndGetConfigMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveConfigs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
static int32_t mndGetDnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveDnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
@ -78,10 +77,8 @@ int32_t mndInitDnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndGetConfigMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndRetrieveConfigs);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VARIABLES, mndCancelGetNextConfig);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndGetDnodeMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndRetrieveDnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DNODE, mndCancelGetNextDnode);
@ -710,70 +707,6 @@ static int32_t mndRetrieveConfigs(SNodeMsg *pReq, SShowObj *pShow, char *data, i
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
static int32_t mndGetDnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "endpoint");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "vnodes");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "support_vnodes");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 256 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "offline_reason");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_DNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
static int32_t mndRetrieveDnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -51,7 +51,6 @@ int32_t mndInitFunc(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_FUNC, mndProcessDropFuncReq);
mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_FUNC, mndProcessRetrieveFuncReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndGetFuncMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndRetrieveFuncs);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndCancelGetNextFunc);

View File

@ -57,6 +57,11 @@ static const SInfosTableSchema bnodesSchema[] = {
{.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
};
static const SInfosTableSchema clusterSchema[] = {
{.name = "id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
};
static const SInfosTableSchema userDBSchema[] = {
{.name = "name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
@ -194,8 +199,9 @@ static const SInfosTableMeta infosMeta[] = {
{TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)},
{TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)},
{TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)},
{TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)},
{TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema)},
{TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)},
{TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(snodesSchema)},
{TSDB_INS_TABLE_USER_DATABASES, userDBSchema, tListLen(userDBSchema)},
{TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
{TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)},

View File

@ -35,7 +35,6 @@ static int32_t mndProcessDropMnodeReq(SNodeMsg *pReq);
static int32_t mndProcessCreateMnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessAlterMnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessDropMnodeRsp(SNodeMsg *pRsp);
static int32_t mndGetMnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
@ -55,7 +54,6 @@ int32_t mndInitMnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_DND_ALTER_MNODE_RSP, mndProcessAlterMnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndProcessDropMnodeRsp);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndGetMnodeMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
@ -610,59 +608,6 @@ static int32_t mndProcessDropMnodeRsp(SNodeMsg *pRsp) {
return 0;
}
static int32_t mndGetMnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "endpoint");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "role");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "role_time");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_MNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
mndUpdateMnodeRole(pMnode);
return 0;
}
static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -77,10 +77,8 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndGetQueryMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);

View File

@ -34,7 +34,6 @@ static int32_t mndProcessCreateQnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessDropQnodeReq(SNodeMsg *pReq);
static int32_t mndProcessDropQnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessQnodeListReq(SNodeMsg *pReq);
static int32_t mndGetQnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveQnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter);
@ -53,7 +52,6 @@ int32_t mndInitQnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_DND_DROP_QNODE_RSP, mndProcessDropQnodeRsp);
mndSetMsgHandle(pMnode, TDMT_MND_QNODE_LIST, mndProcessQnodeListReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndGetQnodeMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndRetrieveQnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndCancelGetNextQnode);
@ -499,46 +497,6 @@ static int32_t mndProcessDropQnodeRsp(SNodeMsg *pRsp) {
return 0;
}
static int32_t mndGetQnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "endpoint");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_QNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
static int32_t mndRetrieveQnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -22,8 +22,6 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq);
static void mndFreeShowObj(SShowObj *pShow);
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId);
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
static int32_t mndProcessShowReq(SNodeMsg *pReq);
static int32_t mndProcessRetrieveReq(SNodeMsg *pReq);
static bool mndCheckRetrieveFinished(SShowObj *pShow);
static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq);
@ -37,8 +35,6 @@ int32_t mndInitShow(SMnode *pMnode) {
return -1;
}
mndSetMsgHandle(pMnode, TDMT_MND_SHOW, mndProcessShowReq);
mndSetMsgHandle(pMnode, TDMT_MND_SHOW_RETRIEVE, mndProcessRetrieveReq);
mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq);
return 0;
}
@ -117,67 +113,6 @@ static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove);
}
static int32_t mndProcessShowReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
int32_t code = -1;
SShowReq showReq = {0};
SShowRsp showRsp = {0};
if (tDeserializeSShowReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &showReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto SHOW_OVER;
}
if (showReq.type <= TSDB_MGMT_TABLE_START || showReq.type >= TSDB_MGMT_TABLE_MAX) {
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
goto SHOW_OVER;
}
ShowMetaFp metaFp = pMgmt->metaFps[showReq.type];
if (metaFp == NULL) {
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
goto SHOW_OVER;
}
SShowObj *pShow = mndCreateShowObj(pMnode, &showReq);
if (pShow == NULL) {
goto SHOW_OVER;
}
showRsp.showId = pShow->id;
showRsp.tableMeta.pSchemas = taosMemoryCalloc(TSDB_MAX_COLUMNS, sizeof(SSchema));
if (showRsp.tableMeta.pSchemas == NULL) {
mndReleaseShowObj(pShow, true);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto SHOW_OVER;
}
code = (*metaFp)(pReq, pShow, &showRsp.tableMeta);
mDebug("show:0x%" PRIx64 ", get meta finished, numOfRows:%d cols:%d showReq.type:%s, result:%s", pShow->id,
pShow->numOfRows, pShow->numOfColumns, mndShowStr(showReq.type), tstrerror(code));
if (code == 0) {
int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
void *pBuf = rpcMallocCont(bufLen);
tSerializeSShowRsp(pBuf, bufLen, &showRsp);
pReq->rspLen = bufLen;
pReq->pRsp = pBuf;
mndReleaseShowObj(pShow, false);
} else {
mndReleaseShowObj(pShow, true);
}
SHOW_OVER:
if (code != 0) {
mError("failed to process show-meta req since %s", terrstr());
}
tFreeSShowReq(&showReq);
tFreeSShowRsp(&showRsp);
return code;
}
static int32_t mndProcessRetrieveReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
@ -458,11 +393,6 @@ void mndVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capaci
}
}
void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->metaFps[showType] = fp;
}
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->retrieveFps[showType] = fp;

View File

@ -58,7 +58,6 @@ int32_t mndInitSma(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndProcessVCreateSmaRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndProcessVDropSmaRsp);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndGetSmaMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma);
return sdbSetTable(pMnode->pSdb, table);

View File

@ -33,7 +33,6 @@ static int32_t mndProcessCreateSnodeReq(SNodeMsg *pReq);
static int32_t mndProcessCreateSnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq);
static int32_t mndProcessDropSnodeRsp(SNodeMsg *pRsp);
static int32_t mndGetSnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter);
@ -51,7 +50,6 @@ int32_t mndInitSnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_SNODE_RSP, mndProcessCreateSnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_SNODE_RSP, mndProcessDropSnodeRsp);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndGetSnodeMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndRetrieveSnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndCancelGetNextSnode);
@ -449,46 +447,6 @@ static int32_t mndProcessDropSnodeRsp(SNodeMsg *pRsp) {
return 0;
}
static int32_t mndGetSnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "endpoint");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_SNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -39,7 +39,6 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp);
static int32_t mndProcessVAlterStbRsp(SNodeMsg *pRsp);
static int32_t mndProcessVDropStbRsp(SNodeMsg *pRsp);
static int32_t mndProcessTableMetaReq(SNodeMsg *pReq);
static int32_t mndGetStbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
@ -60,7 +59,6 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STB, mndGetStbMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb);
@ -508,32 +506,32 @@ static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
}
for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
SField *pField = taosArrayGet(pCreate->pColumns, i);
SField *pField1 = taosArrayGet(pCreate->pColumns, i);
if (pField->type < 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pField->bytes <= 0) {
if (pField1->bytes <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pField->name[0] == 0) {
if (pField1->name[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
}
for (int32_t i = 0; i < pCreate->numOfTags; ++i) {
SField *pField = taosArrayGet(pCreate->pTags, i);
if (pField->type < 0) {
SField *pField1 = taosArrayGet(pCreate->pTags, i);
if (pField1->type < 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pField->bytes <= 0) {
if (pField1->bytes <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pField->name[0] == 0) {
if (pField1->name[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
@ -1315,7 +1313,6 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
int32_t contLen;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
@ -1628,56 +1625,6 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
return 0;
}
static int32_t mndGetStbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) {
return -1;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_STB);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
static void mndExtractTableName(char *tableId, char *name) {
int32_t pos = -1;
int32_t num = 0;

View File

@ -58,7 +58,6 @@ int32_t mndInitStream(SMnode *pMnode) {
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TP, mndGetStreamMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextStream);

View File

@ -53,7 +53,6 @@ int32_t mndInitTopic(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TP, mndGetTopicMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextTopic);

View File

@ -74,7 +74,6 @@ int32_t mndInitTrans(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndGetTransMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndCancelGetNextTrans);
return sdbSetTable(pMnode->pSdb, table);

View File

@ -54,7 +54,6 @@ int32_t mndInitUser(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_USER, mndProcessDropUserReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_USER_AUTH, mndProcessGetUserAuthReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_USER, mndGetUserMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_USER, mndRetrieveUsers);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_USER, mndCancelGetNextUser);
return sdbSetTable(pMnode->pSdb, table);

View File

@ -38,7 +38,6 @@ static int32_t mndProcessCompactVnodeRsp(SNodeMsg *pRsp);
static int32_t mndGetVgroupMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
static int32_t mndGetVnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
@ -57,10 +56,8 @@ int32_t mndInitVgroup(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_DND_SYNC_VNODE_RSP, mndProcessSyncVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_COMPACT_VNODE_RSP, mndProcessCompactVnodeRsp);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndGetVgroupMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndGetVnodeMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndRetrieveVnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VNODES, mndCancelGetNextVnode);
@ -656,46 +653,6 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
return numOfVnodes;
}
static int32_t mndGetVnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "vgId");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
int32_t dnodeId = 0;
if (pShow->payloadLen > 0) {
dnodeId = atoi(pShow->payload);
}
pShow->replica = dnodeId;
pShow->numOfRows = mndGetVnodesNum(pMnode, dnodeId);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;

View File

@ -1,17 +1,17 @@
enable_testing()
add_subdirectory(user)
add_subdirectory(acct)
add_subdirectory(trans)
add_subdirectory(qnode)
add_subdirectory(snode)
add_subdirectory(bnode)
add_subdirectory(show)
add_subdirectory(profile)
#add_subdirectory(user)
#add_subdirectory(acct)
#add_subdirectory(trans)
#add_subdirectory(qnode)
#add_subdirectory(snode)
#add_subdirectory(bnode)
#add_subdirectory(show)
#add_subdirectory(profile)
add_subdirectory(dnode)
add_subdirectory(mnode)
#add_subdirectory(mnode)
add_subdirectory(db)
add_subdirectory(stb)
add_subdirectory(sma)
add_subdirectory(func)
add_subdirectory(topic)
#add_subdirectory(sma)
#add_subdirectory(func)
#add_subdirectory(topic)

View File

@ -88,7 +88,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
pBlockInfo->rows = pHandle->pBlock->numOfRows;
pBlockInfo->uid = pHandle->pBlock->uid;
// pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData.
return 0;
}

View File

@ -2821,6 +2821,11 @@ static bool loadDataBlockFromTableSeq(STsdbReadHandle* pTsdbReadHandle) {
bool tsdbNextDataBlock(tsdbReaderT pHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
for(int32_t i = 0; i < taosArrayGetSize(pTsdbReadHandle->pColumns); ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
colInfoDataCleanup(pColInfo, pTsdbReadHandle->outputCapacity);
}
if (emptyQueryTimewindow(pTsdbReadHandle)) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
return false;
@ -3172,7 +3177,10 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
uid = pCheckInfo->tableId;
}
pDataBlockInfo->uid = uid;
tsdbDebug("data block generated, uid:%"PRIu64" numOfRows:%d, tsrange:%"PRId64" - %"PRId64" %s", uid, cur->rows, cur->win.skey,
cur->win.ekey, pHandle->idStr);
// pDataBlockInfo->uid = uid; // block Id may be over write by assigning uid fro this data block. Do NOT assign the table uid
pDataBlockInfo->rows = cur->rows;
pDataBlockInfo->window = cur->win;
pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
@ -3246,7 +3254,6 @@ SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
* 1. data is from cache, 2. data block is not completed qualified to query time range
*/
STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
if (pHandle->cur.fid == INT32_MIN) {
return pHandle->pColumns;
} else {

View File

@ -550,12 +550,12 @@ typedef struct SFillOperatorInfo {
int32_t capacity;
} SFillOperatorInfo;
typedef struct SGroupKeys {
typedef struct {
char *pData;
bool isNull;
int16_t type;
int32_t bytes;
}SGroupKeys;
} SGroupKeys, SStateKeys;
typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo;
@ -592,14 +592,18 @@ typedef struct SPartitionOperatorInfo {
int32_t pageIndex; // page index of current group
} SPartitionOperatorInfo;
typedef struct SWindowRowsSup {
STimeWindow win;
TSKEY prevTs;
int32_t startRowIndex;
int32_t numOfRows;
} SWindowRowsSup;
typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SGroupResInfo groupResInfo;
STimeWindow curWindow; // current time window
TSKEY prevTs; // previous timestamp
int32_t numOfRows; // number of rows
int32_t start; // start row index
SWindowRowsSup winSup;
bool reptScan; // next round scan
int64_t gap; // session window gap
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
@ -607,12 +611,14 @@ typedef struct SSessionAggOperatorInfo {
typedef struct SStateWindowOperatorInfo {
SOptrBasicInfo binfo;
STimeWindow curWindow; // current time window
int32_t numOfRows; // number of rows
SAggSupporter aggSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
int32_t colIndex; // start row index
int32_t start;
char* prevData; // previous data
bool reptScan;
bool hasKey;
SStateKeys stateKey;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
// bool reptScan;
} SStateWindowOperatorInfo;
typedef struct SSortedMergeOperatorInfo {
@ -712,15 +718,6 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiwaySortOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger);
SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo,
bool groupResultMixedUp);
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, void* merger, bool multigroupResult);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput);
#endif

View File

@ -94,8 +94,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
// NOTE: there are four bytes of an integer more than the required buffer space.
// struct size + data payload + length for each column + bitmap length
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockDataGetSerialMetaSize(pInput->pData) +
ceil(blockDataGetSerialRowSize(pInput->pData) * pInput->pData->info.rows);
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockDataGetSerialMetaSize(pInput->pData) + blockDataGetSize(pInput->pData);
pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
if (pBuf->pData == NULL) {

View File

@ -184,8 +184,6 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o
}
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset);
static bool functionNeedToExecute(SqlFunctionCtx* pCtx);
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn);
@ -406,85 +404,6 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
return p1 != NULL;
}
#if 0
static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid,
char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) {
bool existed = false;
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tableGroupId);
SResultRow** p1 =
(SResultRow**)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
// in case of repeat scan/reverse scan, no new time window added.
if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
if (!masterscan) { // the *p1 may be NULL in case of sliding+offset exists.
return (p1 != NULL) ? *p1 : NULL;
}
if (p1 != NULL) {
if (pResultRowInfo->size == 0) {
existed = false;
// assert(pResultRowInfo->curPos == -1);
} else if (pResultRowInfo->size == 1) {
// existed = (pResultRowInfo->pResult[0] == (*p1));
// pResultRowInfo->curPos = 0;
} else { // check if current pResultRowInfo contains the existed pResultRow
SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid, pResultRowInfo);
int64_t* index =
taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
if (index != NULL) {
// pResultRowInfo->curPos = (int32_t)*index;
existed = true;
} else {
existed = false;
}
}
}
} else {
// In case of group by column query, the required SResultRow object must be existed in the pResultRowInfo object.
if (p1 != NULL) {
return *p1;
}
}
if (!existed) {
// prepareResultListBuffer(pResultRowInfo, pRuntimeEnv);
SResultRow* pResult = NULL;
if (p1 == NULL) {
pResult = getNewResultRow(pRuntimeEnv->pool);
int32_t ret = initResultRow(pResult);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// add a new result set for a new group
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult,
POINTER_BYTES);
SResultRowCell cell = {.groupId = tableGroupId, .pRow = pResult};
taosArrayPush(pRuntimeEnv->pResultRowArrayList, &cell);
} else {
pResult = *p1;
}
pResultRowInfo->curPos = pResultRowInfo->size;
pResultRowInfo->pResult[pResultRowInfo->size++] = pResult;
int64_t index = pResultRowInfo->curPos;
SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid, pResultRowInfo);
taosHashPut(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &index,
POINTER_BYTES);
}
// too many time window in query
if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
}
return pResultRowInfo->pResult[pResultRowInfo->curPos];
}
#endif
SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
SFilePage* pData = NULL;
@ -753,36 +672,6 @@ static bool chkWindowOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
}
static int32_t setResultOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid,
STimeWindow* win, bool masterscan, SResultRow** pResult, int64_t tableGroupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) {
assert(win->skey <= win->ekey);
SDiskbasedBuf* pResultBuf = pRuntimeEnv->pResultBuf;
SResultRow* pResultRow = NULL;//doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char*)&win->skey, TSDB_KEYSIZE,
// masterscan, tableGroupId);
if (pResultRow == NULL) {
*pResult = NULL;
return TSDB_CODE_SUCCESS;
}
// not assign result buffer yet, add new result buffer
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t)tableGroupId,
pRuntimeEnv->pQueryAttr->intermediateResultRowSize);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
}
// set time window for current result
pResultRow->win = (*win);
*pResult = pResultRow;
setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
return TSDB_CODE_SUCCESS;
}
static void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
static int32_t setResultOutputBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_t id, STimeWindow* win, bool masterscan,
@ -1270,7 +1159,7 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k);
colDataAssign(pColInfoData, pCtx[k].input.pData[0], pCtx[k].input.numOfRows);
pResult->info.rows = pCtx[0].input.numOfRows;
pResult->info.rows = pSrcBlock->info.rows;
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k);
for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
@ -1667,8 +1556,8 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
while (1) {
// null data, failed to allocate more memory buffer
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult,
tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset);
// ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult,
// tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
@ -1716,22 +1605,23 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
}
static void doKeepTuple(SSessionAggOperatorInfo* pInfo, int64_t ts) {
pInfo->curWindow.ekey = ts;
pInfo->prevTs = ts;
pInfo->numOfRows += 1;
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts) {
pRowSup->win.ekey = ts;
pRowSup->prevTs = ts;
pRowSup->numOfRows += 1;
}
static void doKeepSessionStartInfo(SSessionAggOperatorInfo* pInfo, const int64_t* tsList, int32_t rowIndex) {
pInfo->start = rowIndex;
pInfo->numOfRows = 0;
pInfo->curWindow.skey = tsList[rowIndex];
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex) {
pRowSup->startRowIndex = rowIndex;
pRowSup->numOfRows = 0;
pRowSup->win.skey = tsList[rowIndex];
}
// todo handle multiple tables cases.
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// todo find the correct time stamp column slot
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
bool masterScan = true;
@ -1739,31 +1629,34 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
int64_t gid = pBlock->info.groupId;
int64_t gap = pInfo->gap;
pInfo->numOfRows = 0;
if (!pInfo->reptScan) {
pInfo->reptScan = true;
pInfo->prevTs = INT64_MIN;
pInfo->winSup.prevTs = INT64_MIN;
}
SWindowRowsSup* pRowSup = &pInfo->winSup;
pRowSup->numOfRows = 0;
// In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (pInfo->prevTs == INT64_MIN) {
doKeepSessionStartInfo(pInfo, tsList, j);
doKeepTuple(pInfo, tsList[j]);
} else if (tsList[j] - pInfo->prevTs <= gap && (tsList[j] - pInfo->prevTs) >= 0) {
if (pInfo->winSup.prevTs == INT64_MIN) {
doKeepNewWindowStartInfo(pRowSup, tsList, j);
doKeepTuple(pRowSup, tsList[j]);
} else if (tsList[j] - pRowSup->prevTs <= gap && (tsList[j] - pRowSup->prevTs) >= 0) {
// The gap is less than the threshold, so it belongs to current session window that has been opened already.
doKeepTuple(pInfo, tsList[j]);
if (j == 0 && pInfo->start != 0) {
pInfo->start = 0;
doKeepTuple(pRowSup, tsList[j]);
if (j == 0 && pRowSup->startRowIndex != 0) {
pRowSup->startRowIndex = 0;
}
} else { // start a new session window
SResultRow* pResult = NULL;
// keep the time window for the closed time window.
STimeWindow window = pInfo->curWindow;
STimeWindow window = pRowSup->win;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
pRowSup->win.ekey = pRowSup->win.skey;
int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan,
&pResult, gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
@ -1772,24 +1665,24 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
// pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->timeWindowData, &window, false);
doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
// here we start a new session window
doKeepSessionStartInfo(pInfo, tsList, j);
doKeepTuple(pInfo, tsList[j]);
doKeepNewWindowStartInfo(pRowSup, tsList, j);
doKeepTuple(pRowSup, tsList[j]);
}
}
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = tsList[pBlock->info.rows - 1];
int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &pInfo->curWindow, masterScan, &pResult,
pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &pRowSup->win, masterScan, &pResult,
gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->timeWindowData, &pInfo->curWindow, false);
doApplyFunctions(pInfo->binfo.pCtx, &pInfo->curWindow, &pInfo->timeWindowData, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false);
doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
@ -2043,30 +1936,6 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return NULL;
}
static void doFreeQueryHandle(STaskRuntimeEnv* pRuntimeEnv) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
// tsdbCleanupReadHandle(pRuntimeEnv->pTsdbReadHandle);
pRuntimeEnv->pTsdbReadHandle = NULL;
// SMemRef* pMemRef = &pQueryAttr->memRef;
// assert(pMemRef->ref == 0 && pMemRef->snapshot.imem == NULL && pMemRef->snapshot.mem == NULL);
}
static void destroyTsComp(STaskRuntimeEnv* pRuntimeEnv, STaskAttr* pQueryAttr) {
if (pQueryAttr->tsCompQuery && pRuntimeEnv->outputBuf && pRuntimeEnv->outputBuf->pDataBlock &&
taosArrayGetSize(pRuntimeEnv->outputBuf->pDataBlock) > 0) {
SColumnInfoData* pColInfoData = taosArrayGet(pRuntimeEnv->outputBuf->pDataBlock, 0);
if (pColInfoData) {
TdFilePtr pFile = *(TdFilePtr*)pColInfoData->pData; // TODO refactor
if (pFile != NULL) {
taosCloseFile(&pFile);
*(TdFilePtr*)pColInfoData->pData = NULL;
}
}
}
}
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
// abort current query execution.
@ -2878,14 +2747,10 @@ static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t num
}
void setTagValue(SOperatorInfo* pOperatorInfo, void* pTable, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
SExprInfo* pExpr = pOperatorInfo->pExpr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
SExprInfo* pExprInfo = &pExpr[0];
int32_t functionId = getExprFunctionId(pExprInfo);
#if 0
if (pQueryAttr->numOfOutput == 1 && functionId == FUNCTION_TS_COMP && pQueryAttr->stableQuery) {
assert(pExprInfo->base.numOfParams == 1);
@ -2930,6 +2795,8 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void* pTable, SqlFunctionCtx* pCt
if (pRuntimeEnv->pTsBuf != NULL) {
setCtxTagForJoin(pRuntimeEnv, &pCtx[0], pExprInfo, pTable);
}
#endif
}
void copyToSDataBlock(SSDataBlock* pBlock, int32_t* offset, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pResBuf) {
@ -3230,39 +3097,6 @@ void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
cleanupResultRowInfo(&pTableQueryInfo->resInfo);
}
void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
SFilePage* bufPage = getBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
int32_t offset = 0;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
offset += pCtx[i].resDataInfo.bytes;
continue;
}
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, bufPage, pResult->offset, offset);
offset += pCtx[i].resDataInfo.bytes;
int32_t functionId = pCtx[i].functionId;
if (functionId < 0) {
continue;
}
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
// if (i > 0) pCtx[i].pTsOutput = pCtx[i - 1].pOutput;
}
// if (!pResInfo->initialized) {
// aAggs[functionId].init(&pCtx[i], pResInfo);
// }
}
}
void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) {
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
@ -4016,64 +3850,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
#if 0
// TODO set the tags scan handle
if (onlyQueryTags(pQueryAttr)) {
return TSDB_CODE_SUCCESS;
}
STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) {
cond.type = BLOCK_LOAD_TABLE_SEQ_ORDER;
}
if (!isSTableQuery
&& (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1)
&& (cond.order == TSDB_ORDER_ASC)
&& (!QUERY_IS_INTERVAL_QUERY(pQueryAttr))
&& (!pQueryAttr->groupbyColumn)
&& (!pQueryAttr->simpleAgg)
) {
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0);
cond.twindow = pCheckInfo->win;
}
terrno = TSDB_CODE_SUCCESS;
if (isFirstLastRowQuery(pQueryAttr)) {
pRuntimeEnv->pTsdbReadHandle = tsdbQueryLastRow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
// update the query time window
pQueryAttr->window = cond.twindow;
if (pQueryAttr->tableGroupInfo.numOfTables == 0) {
pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0;
} else {
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
size_t t = taosArrayGetSize(group);
for (int32_t j = 0; j < t; ++j) {
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
pCheckInfo->win = pQueryAttr->window;
pCheckInfo->lastKey = pCheckInfo->win.skey;
}
}
}
} else if (isCachedLastQuery(pQueryAttr)) {
pRuntimeEnv->pTsdbReadHandle = tsdbQueryCacheLast(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
} else if (pQueryAttr->pointInterpQuery) {
pRuntimeEnv->pTsdbReadHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
} else {
pRuntimeEnv->pTsdbReadHandle = tsdbQueryTables(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
}
#endif
return terrno;
}
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
#if 0
if (order == TSDB_ORDER_ASC) {
@ -4612,7 +4388,6 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
goto _error;
}
size_t size = pBlock->info.numOfCols;
pInfo->pResult = pBlock;
pInfo->seqLoadData = true;
@ -4623,7 +4398,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = size;
pOperator->numOfOutput = pBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function.
pOperator->getNextFn = doLoadRemoteData;
@ -5811,84 +5586,78 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgr
pOperator->status = OP_EXEC_DONE;
}
// SQInfo* pQInfo = pRuntimeEnv->qinfo;
// pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st);
return pIntervalInfo->binfo.pRes;
}
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pSDataBlock) {
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex);
int64_t gid = pBlock->info.groupId;
SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
TSKEY* tsList = (TSKEY*)pTsColInfoData->pData;
if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) {
pInfo->reptScan = true;
taosMemoryFreeClear(pInfo->prevData);
}
bool masterScan = true;
int32_t numOfOutput = pOperator->numOfOutput;
pInfo->numOfRows = 0;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
char* val = ((char*)pColInfoData->pData) + bytes * j;
if (isNull(val, type)) {
int16_t bytes = pStateColInfoData->info.bytes;
int16_t type = pStateColInfoData->info.type;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
SWindowRowsSup* pRowSup = &pInfo->winSup;
pRowSup->numOfRows = 0;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pBlock->pBlockAgg)) {
continue;
}
if (pInfo->prevData == NULL) {
pInfo->prevData = taosMemoryMalloc(bytes);
memcpy(pInfo->prevData, val, bytes);
pInfo->numOfRows = 1;
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
pInfo->start = j;
} else if (memcmp(pInfo->prevData, val, bytes) == 0) {
pInfo->curWindow.ekey = tsList[j];
pInfo->numOfRows += 1;
// pInfo->start = j;
if (j == 0 && pInfo->start != 0) {
pInfo->numOfRows = 1;
pInfo->start = 0;
char* val = colDataGetData(pStateColInfoData, j);
if (!pInfo->hasKey) {
memcpy(pInfo->stateKey.pData, val, bytes);
pInfo->hasKey = true;
doKeepNewWindowStartInfo(pRowSup, tsList, j);
doKeepTuple(pRowSup, tsList[j]);
} else if (memcmp(pInfo->stateKey.pData, val, bytes) == 0) {
doKeepTuple(pRowSup, tsList[j]);
if (j == 0 && pRowSup->startRowIndex != 0) {
pRowSup->startRowIndex = 0;
}
} else {
} else { // a new state window started
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid,
&pInfo->curWindow, masterScan, &pResult, item->groupIndex, pBInfo->pCtx,
pOperator->numOfOutput, pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
// pSDataBlock->info.rows, pOperator->numOfOutput);
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
memcpy(pInfo->prevData, val, bytes);
pInfo->numOfRows = 1;
pInfo->start = j;
// keep the time window for the closed time window.
STimeWindow window = pRowSup->win;
pRowSup->win.ekey = pRowSup->win.skey;
int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan,
&pResult, gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->timeWindowData, &window, false);
doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
// here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j);
doKeepTuple(pRowSup, tsList[j]);
}
}
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, &pInfo->curWindow,
masterScan, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &pRowSup->win, masterScan, &pResult,
gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
// pSDataBlock->info.rows, pOperator->numOfOutput);
updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false);
doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
}
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
@ -5896,16 +5665,16 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
return NULL;
}
SStateWindowOperatorInfo* pWindowInfo = pOperator->info;
SStateWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) {
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
// if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
// pOperator->status = OP_EXEC_DONE;
// }
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
return NULL;
}
return pBInfo->pRes;
}
@ -5923,28 +5692,20 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
break;
}
// setInputDataBlock(pOperator, pBInfo->pCtx, pDataBlock, TSDB_ORDER_ASC);
// if (pWindowInfo->colIndex == -1) {
// pWindowInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock);
// }
doStateWindowAggImpl(pOperator, pWindowInfo, pBlock);
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order);
doStateWindowAggImpl(pOperator, pInfo, pBlock);
}
// restore the value
// pQueryAttr->order.order = order;
// pQueryAttr->window = win;
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput);
finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
// initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
// if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
// pOperator->status = OP_EXEC_DONE;
// }
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity);
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
@ -6268,7 +6029,7 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
taosMemoryFreeClear(pInfo->prevData);
taosMemoryFreeClear(pInfo->stateKey.pData);
}
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
@ -6493,7 +6254,6 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doAllIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
@ -6503,16 +6263,18 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->colIndex = -1;
pInfo->reptScan = false;
// pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, 4096, pResBlock, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pOperator->name = "StateWindowOperator";
// pOperator->operatorType = OP_StateWindow;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
@ -6525,6 +6287,10 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_SUCCESS;
return NULL;
}
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
@ -6546,7 +6312,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pInfo->gap = gap;
pInfo->binfo.pRes = pResBlock;
pInfo->prevTs = INT64_MIN;
pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false;
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW;
@ -6589,7 +6355,6 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doSTableIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
@ -6614,7 +6379,6 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->getNextFn = doAllSTableIntervalAgg;
pOperator->closeFn = destroyBasicOperatorInfo;
@ -6687,52 +6451,6 @@ _error:
return NULL;
}
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, void* pMerger, bool multigroupResult) {
SSLimitOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSLimitOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
// pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
// pInfo->slimit = pQueryAttr->slimit;
// pInfo->limit = pQueryAttr->limit;
// pInfo->capacity = pResultInfo->capacity;
// pInfo->threshold = (int64_t)(pInfo->capacity * 0.8);
// pInfo->currentOffset = pQueryAttr->limit.offset;
// pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
pInfo->multigroupResult = multigroupResult;
// TODO refactor
int32_t len = 0;
for (int32_t i = 0; i < numOfOutput; ++i) {
len += pExpr[i].base.resSchema.bytes;
}
int32_t numOfCols = (pInfo->orderColumnList != NULL) ? (int32_t)taosArrayGetSize(pInfo->orderColumnList) : 0;
pInfo->prevRow = taosMemoryCalloc(1, (POINTER_BYTES * numOfCols + len));
int32_t offset = POINTER_BYTES * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
SColIndex* index = taosArrayGet(pInfo->orderColumnList, i);
offset += pExpr[index->colIndex].base.resSchema.bytes;
}
// pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pOperator->resultInfo.capacity);
pOperator->name = "SLimitOperator";
// pOperator->operatorType = OP_SLimit;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
// pOperator->exec = doSLimit;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->closeFn = destroySlimitOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
}
static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
#if 0
SOperatorInfo* pOperator = (SOperatorInfo*) param;
@ -6740,7 +6458,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
return NULL;
}
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
STagScanInfo *pInfo = pOperator->info;
@ -6872,7 +6589,6 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
assert(numOfGroup == 0 || numOfGroup == 1);
pInfo->totalTables = pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
pInfo->curPos = 0;
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
@ -6884,7 +6600,6 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo
pOperator->getNextFn = doTagScan;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->closeFn = destroyTagScanOperatorInfo;
return pOperator;
@ -7168,9 +6883,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
int32_t num = 0;
if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
int32_t num = 0;
SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num);
@ -7179,8 +6894,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
int32_t num = 0;
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
@ -7194,7 +6907,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
@ -7219,7 +6931,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
@ -7228,10 +6939,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
return createPartitionOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
} else if (QUERY_NODE_STATE_WINDOW == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createStatewindowOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo);
} else {
ASSERT(0);
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {

View File

@ -48,7 +48,7 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char**
SColumn* pCol = taosArrayGet(pGroupColList, i);
(*keyLen) += pCol->bytes;
struct SGroupKeys key = {0};
SGroupKeys key = {0};
key.bytes = pCol->bytes;
key.type = pCol->type;
key.isNull = false;
@ -124,6 +124,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
pkey->isNull = true;
} else {
pkey->isNull = false;
char* val = colDataGetData(pColInfoData, rowIndex);
if (IS_VAR_DATA_TYPE(pkey->type)) {
memcpy(pkey->pData, val, varDataTLen(val));
@ -340,6 +341,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupOperatorInfo;

View File

@ -208,12 +208,15 @@ void MonitorTest::GetDiskInfo(SMonDiskInfo *pInfo) {
void MonitorTest::GetLogInfo(SMonLogs *logs) {
logs->logs = taosArrayInit(4, sizeof(SMonLogItem));
SMonLogItem item1 = {.level = DEBUG_INFO};
SMonLogItem item1 = {0};
item1.level = DEBUG_INFO;
item1.ts = taosGetTimestampMs();
strcpy(item1.content, "log test1");
taosArrayPush(logs->logs, &item1);
SMonLogItem item2 = {.level = DEBUG_ERROR};
SMonLogItem item2 = {0};
item2.level = DEBUG_ERROR;
item2.ts = taosGetTimestampMs();
strcpy(item2.content, "log test2");
taosArrayPush(logs->logs, &item2);

View File

@ -1791,13 +1791,14 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
// fi->data = null; use fi->desc as data, because json value is variable, so use tVariant (fi->desc)
}
if(type != TSDB_DATA_TYPE_JSON){
if(type != TSDB_DATA_TYPE_JSON) {
if (dType->type == type) {
assignVal(fi->data, nodesGetValueFromNode(var), dType->bytes, type);
} else {
SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
out.columnData->info.type = type;
out.columnData->info.bytes = tDataTypes[type].bytes;
ASSERT(!IS_VAR_DATA_TYPE(type));
// todo refactor the convert
int32_t code = doConvertDataType(var, &out);
@ -2953,8 +2954,8 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, int8_t** p, SColumnD
for (int32_t i = 0; i < numOfRows; ++i) {
void *colData = colDataGetData((SColumnInfoData *)info->cunits[0].colData, i);
if (colData == NULL || colDataIsNull((SColumnInfoData *)info->cunits[0].colData, 0, i, NULL)) {
SColumnInfoData* pData = info->cunits[0].colData;
if (colData == NULL || colDataIsNull_s(pData, i)) {
all = false;
continue;
}

View File

@ -191,13 +191,23 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
}
SColumnNode *ref = (SColumnNode *)node;
if (ref->dataBlockId >= taosArrayGetSize(ctx->pBlockList)) {
int32_t index = -1;
for(int32_t i = 0; i < taosArrayGetSize(ctx->pBlockList); ++i) {
SSDataBlock* pb = taosArrayGetP(ctx->pBlockList, i);
if (pb->info.blockId == ref->dataBlockId) {
index = i;
break;
}
}
if (index == -1) {
sclError("column tupleId is too big, tupleId:%d, dataBlockNum:%d", ref->dataBlockId, (int32_t)taosArrayGetSize(ctx->pBlockList));
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, ref->dataBlockId);
if (NULL == block || ref->slotId >= taosArrayGetSize(block->pDataBlock)) {
SSDataBlock *block = *(SSDataBlock **)taosArrayGet(ctx->pBlockList, index);
if (NULL == block || ref->slotId >= block->info.numOfCols) {
sclError("column slotId is too big, slodId:%d, dataBlockSize:%d", ref->slotId, (int32_t)taosArrayGetSize(block->pDataBlock));
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}

View File

@ -100,6 +100,8 @@ while $dbCnt < 2
return -1
endi
print =================> 1
sql select * from `Table`
print rows: $rows
print $data00 $data01 $data02 $data03
@ -117,6 +119,7 @@ while $dbCnt < 2
return -1
endi
print ================>2
sql select * from `TAble`
print rows: $rows
print $data00 $data01 $data02 $data03

View File

@ -21,7 +21,7 @@ run tsim/insert/null.sim
run tsim/query/interval.sim
run tsim/query/interval-offset.sim
run tsim/query/scalarFunction.sim
#run tsim/query/scalarFunction.sim
run tsim/show/basic.sim

View File

@ -151,6 +151,7 @@ print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 20, 0}@ then
print expect @{consume success: 20, 0}@, actual: $system_content
return -1
endi