Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

This commit is contained in:
wenzhouwww@live.cn 2022-07-18 09:43:43 +08:00
commit 11c39f1f98
79 changed files with 1213 additions and 419 deletions

View File

@ -35,6 +35,7 @@ extern "C" {
#define TSDB_INS_TABLE_USER_INDEXES "user_indexes"
#define TSDB_INS_TABLE_USER_STABLES "user_stables"
#define TSDB_INS_TABLE_USER_TABLES "user_tables"
#define TSDB_INS_TABLE_USER_TAGS "user_tags"
#define TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED "user_table_distributed"
#define TSDB_INS_TABLE_USER_USERS "user_users"
#define TSDB_INS_TABLE_LICENCES "grants"

View File

@ -268,6 +268,26 @@ typedef struct SSortExecInfo {
int32_t readBytes; // read io bytes
} SSortExecInfo;
//======================================================================================================================
// for grant
typedef enum {
TSDB_GRANT_ALL,
TSDB_GRANT_TIME,
TSDB_GRANT_USER,
TSDB_GRANT_DB,
TSDB_GRANT_TIMESERIES,
TSDB_GRANT_DNODE,
TSDB_GRANT_ACCT,
TSDB_GRANT_STORAGE,
TSDB_GRANT_SPEED,
TSDB_GRANT_QUERY_TIME,
TSDB_GRANT_CONNS,
TSDB_GRANT_STREAMS,
TSDB_GRANT_CPU_CORES,
} EGrantType;
int32_t grantCheck(EGrantType grant);
#ifdef __cplusplus
}
#endif

View File

@ -139,6 +139,7 @@ extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval;
extern int32_t tsTtlUnit;
extern int32_t tsTtlPushInterval;
extern int32_t tsGrantHBInterval;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)

View File

@ -104,6 +104,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_STB,
TSDB_MGMT_TABLE_STREAMS,
TSDB_MGMT_TABLE_TABLE,
TSDB_MGMT_TABLE_TAG,
TSDB_MGMT_TABLE_USER,
TSDB_MGMT_TABLE_GRANTS,
TSDB_MGMT_TABLE_VGROUP,

View File

@ -152,6 +152,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_HB_TIMER, "grant-hb-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL)

View File

@ -172,13 +172,8 @@ typedef struct tExprNode {
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));
typedef enum {
SHOULD_FREE_COLDATA = 0x1, // the newly created column data needs to be destroyed.
DELEGATED_MGMT_COLDATA = 0x2, // input column data should not be released.
} ECOLDATA_MGMT_TYPE_E;
struct SScalarParam {
ECOLDATA_MGMT_TYPE_E type;
bool colAlloced;
SColumnInfoData *columnData;
SHashObj *pHashFilter;
int32_t hashValueType;

View File

@ -170,6 +170,7 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_STABLES_STMT,
QUERY_NODE_SHOW_STREAMS_STMT,
QUERY_NODE_SHOW_TABLES_STMT,
QUERY_NODE_SHOW_TAGS_STMT,
QUERY_NODE_SHOW_USERS_STMT,
QUERY_NODE_SHOW_LICENCE_STMT,
QUERY_NODE_SHOW_VGROUPS_STMT,

View File

@ -114,6 +114,7 @@ int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
#ifdef __cplusplus
}

View File

@ -153,6 +153,15 @@ static const SSysDbTableSchema userTblsSchema[] = {
{.name = "type", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
};
static const SSysDbTableSchema userTagsSchema[] = {
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "tag_name", .bytes = TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "tag_type", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
{.name = "tag_value", .bytes = TSDB_MAX_TAGS_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
};
static const SSysDbTableSchema userTblDistSchema[] = {
{.name = "db_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
@ -255,6 +264,7 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)},
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
{TSDB_INS_TABLE_USER_TABLES, userTblsSchema, tListLen(userTblsSchema)},
{TSDB_INS_TABLE_USER_TAGS, userTagsSchema, tListLen(userTagsSchema)},
// {TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)},
{TSDB_INS_TABLE_USER_USERS, userUsersSchema, tListLen(userUsersSchema)},
{TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema)},

View File

@ -184,6 +184,7 @@ int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 60;
int32_t tsGrantHBInterval = 60;
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);

View File

@ -46,6 +46,7 @@ int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantReq(SRpcMsg *pMsg);
// dmWorker.c
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -331,7 +331,8 @@ SArray *dmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
// Requests handled by MNODE
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
// if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
code = 0;

View File

@ -144,6 +144,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_DND_SYSTABLE_RETRIEVE:
code = dmProcessRetrieve(pMgmt, pMsg);
break;
case TDMT_MND_GRANT:
code = dmProcessGrantReq(pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
break;

View File

@ -206,7 +206,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
// if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -166,6 +166,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
taosWriteQitem(pVnode->pFetchQ, pMsg);
break;
case WRITE_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pWriteQ, pMsg);
break;

View File

@ -26,6 +26,7 @@ int32_t mndInitCluster(SMnode *pMnode);
void mndCleanupCluster(SMnode *pMnode);
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
int64_t mndGetClusterId(SMnode *pMnode);
int64_t mndGetClusterCreateTime(SMnode *pMnode);
#ifdef __cplusplus
}

View File

@ -29,6 +29,7 @@ void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode);
int32_t mndGetDnodeSize(SMnode *pMnode);
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs);
void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps);
#ifdef __cplusplus
}

View File

@ -22,27 +22,10 @@
#include "mndInt.h"
typedef enum {
TSDB_GRANT_ALL,
TSDB_GRANT_TIME,
TSDB_GRANT_USER,
TSDB_GRANT_DB,
TSDB_GRANT_TIMESERIES,
TSDB_GRANT_DNODE,
TSDB_GRANT_ACCT,
TSDB_GRANT_STORAGE,
TSDB_GRANT_SPEED,
TSDB_GRANT_QUERY_TIME,
TSDB_GRANT_CONNS,
TSDB_GRANT_STREAMS,
TSDB_GRANT_CPU_CORES,
} EGrantType;
int32_t mndInitGrant();
int32_t mndInitGrant(SMnode *pMnode);
void mndCleanupGrant();
void grantParseParameter();
int32_t grantCheck(EGrantType grant);
void grantReset(EGrantType grant, uint64_t value);
void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value);
void grantAdd(EGrantType grant, uint64_t value);
void grantRestore(EGrantType grant, uint64_t value);

View File

@ -79,6 +79,23 @@ int64_t mndGetClusterId(SMnode *pMnode) {
return clusterId;
}
int64_t mndGetClusterCreateTime(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
int64_t createTime = INT64_MAX;
while (1) {
SClusterObj *pCluster = NULL;
pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster);
if (pIter == NULL) break;
createTime = pCluster->createdTime;
sdbRelease(pSdb, pCluster);
}
return createTime;
}
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
terrno = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -509,6 +509,12 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
SUserObj *pUser = NULL;
SCreateDbReq createReq = {0};
// code = grantCheck(TSDB_GRANT_DB);
// if (code != 0) {
// terrno = code;
// goto _OVER;
// }
if (tDeserializeSCreateDbReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;

View File

@ -262,7 +262,7 @@ bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
return true;
}
static void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
SSdb *pSdb = pMnode->pSdb;
int32_t numOfEps = 0;
@ -621,6 +621,12 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
SDnodeObj *pDnode = NULL;
SCreateDnodeReq createReq = {0};
// code = grantCheck(TSDB_GRANT_DNODE);
// if (code != TSDB_CODE_SUCCESS) {
// terrno = code;
// goto _OVER;
// }
if (tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;

View File

@ -118,17 +118,21 @@ static int32_t mndRetrieveGrant(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
return numOfRows;
}
static int32_t mndProcessGrantHB(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; }
int32_t mndInitGrant(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_GRANTS, mndRetrieveGrant);
mndSetMsgHandle(pMnode, TDMT_MND_GRANT_HB_TIMER, mndProcessGrantHB);
return 0;
}
void mndCleanupGrant() {}
void grantParseParameter() { mError("can't parsed parameter k"); }
int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; }
void grantReset(EGrantType grant, uint64_t value) {}
void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value) {}
void grantAdd(EGrantType grant, uint64_t value) {}
void grantRestore(EGrantType grant, uint64_t value) {}
int32_t dmProcessGrantReq(SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
#endif

View File

@ -90,6 +90,16 @@ static void mndPullupTelem(SMnode *pMnode) {
}
}
static void mndGrantHeartBeat(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
static void *mndThreadFp(void *param) {
SMnode *pMnode = param;
int64_t lastTime = 0;
@ -115,6 +125,10 @@ static void *mndThreadFp(void *param) {
if (lastTime % (tsTelemInterval * 10) == 0) {
mndPullupTelem(pMnode);
}
if (lastTime % (tsGrantHBInterval * 10) == 0) {
mndGrantHeartBeat(pMnode);
}
}
return NULL;
@ -402,6 +416,9 @@ int32_t mndStart(SMnode *pMnode) {
}
mndSetRestore(pMnode, true);
}
grantReset(pMnode, TSDB_GRANT_ALL, 0);
return mndInitTimer(pMnode);
}

View File

@ -76,6 +76,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type = TSDB_MGMT_TABLE_STB;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) {
type = TSDB_MGMT_TABLE_TABLE;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, len) == 0) {
type = TSDB_MGMT_TABLE_TAG;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, len) == 0) {
// type = TSDB_MGMT_TABLE_DIST;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, len) == 0) {

View File

@ -1689,6 +1689,9 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
_OVER:
taosMemoryFreeClear(stbObj.pTags);
taosMemoryFreeClear(stbObj.pColumns);
if (pAlter->commentLen > 0) {
taosMemoryFreeClear(stbObj.comment);
}
return code;
}
@ -1733,7 +1736,7 @@ _OVER:
mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb);
taosArrayDestroy(alterReq.pFields);
tFreeSMAltertbReq(&alterReq);
return code;
}

View File

@ -363,6 +363,12 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
goto _OVER;
}
// code = grantCheck(TSDB_GRANT_USER);
// if (code != TSDB_CODE_SUCCESS) {
// terrno = code;
// goto _OVER;
// }
code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;

View File

@ -99,7 +99,8 @@ STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
int metaGetTbNum(SMeta* pMeta);
int64_t metaGetTbNum(SMeta* pMeta);
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);

View File

@ -463,12 +463,18 @@ _err:
return code;
}
int metaGetTbNum(SMeta *pMeta) {
// N.B. Called by statusReq per second
int64_t metaGetTbNum(SMeta *pMeta) {
// TODO
// ASSERT(0);
return 0;
}
// N.B. Called by statusReq per second
int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
// TODO
return 400;
}
typedef struct {
SMeta *pMeta;
TBC *pCur;

View File

@ -3186,6 +3186,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
*suid = 0;
if (mr.me.type == TSDB_CHILD_TABLE) {
tDecoderClear(&mr.coder);
*suid = mr.me.ctbEntry.suid;
code = metaGetTableEntryByUid(&mr, *suid);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -239,9 +239,9 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->vgId = TD_VID(pVnode);
pLoad->syncState = syncGetMyRole(pVnode->sync);
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
pLoad->numOfTimeSeries = 400;
pLoad->totalStorage = 300;
pLoad->compStorage = 200;
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
pLoad->totalStorage = (int64_t)3 * 1073741824;
pLoad->compStorage = (int64_t)2 * 1073741824;
pLoad->pointsWritten = 100;
pLoad->numOfSelectReqs = 1;
pLoad->numOfInsertReqs = 3;

View File

@ -175,6 +175,7 @@ _err:
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
int32_t code = 0;
SVnode *pVnode = pWriter->pVnode;
if (pWriter->pMetaSnapWriter) {
code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback);
@ -186,8 +187,31 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
if (code) goto _err;
}
if (!rollback) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN];
pVnode->state.committed = pWriter->ever;
pVnode->state.applied = pWriter->ever;
// pVnode->state.applyTerm = ;
// pVnode->state.commitTerm = ;
info.config = pVnode->config;
info.state.committed = pVnode->state.applied;
info.state.commitTerm = pVnode->state.applyTerm;
info.state.commitID = pVnode->state.commitID;
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
code = vnodeSaveInfo(dir, &info);
if (code) goto _err;
code = vnodeCommitInfo(dir, &info);
if (code) goto _err;
} else {
ASSERT(0);
}
_exit:
vInfo("vgId:%d vnode snapshot writer closed, rollback:%d", TD_VID(pWriter->pVnode), rollback);
vInfo("vgId:%d vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback);
taosMemoryFree(pWriter);
return code;

View File

@ -272,7 +272,7 @@ void ctgFreeHandle(SCatalog* pCtg) {
taosMemoryFree(pCtg);
ctgInfo("handle freed, culsterId:0x%" PRIx64, clusterId);
ctgInfo("handle freed, clusterId:0x%" PRIx64, clusterId);
}
void ctgClearHandle(SCatalog* pCtg) {
@ -303,7 +303,7 @@ void ctgClearHandle(SCatalog* pCtg) {
CTG_CACHE_STAT_INC(numOfClear, 1);
ctgInfo("handle cleared, culsterId:0x%" PRIx64, clusterId);
ctgInfo("handle cleared, clusterId:0x%" PRIx64, clusterId);
}
void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) {

View File

@ -90,6 +90,7 @@ _return:
tsem_post(&pInserter->ready);
taosMemoryFree(pMsg->pData);
taosMemoryFree(param);
return TSDB_CODE_SUCCESS;
@ -283,6 +284,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
taosArrayDestroy(pInserter->pDataBlocks);
taosMemoryFree(pInserter->pSchema);
taosMemoryFree(pInserter->pParam);
taosHashCleanup(pInserter->pCols);
taosThreadMutexDestroy(&pInserter->mutex);
return TSDB_CODE_SUCCESS;
}

View File

@ -115,9 +115,6 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
p->groupId = *(uint64_t*)key;
p->pos = *(SResultRowPosition*)pData;
memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
#ifdef BUF_PAGE_DEBUG
qDebug("page_groupRes, groupId:%" PRIu64 ",pageId:%d,offset:%d\n", p->groupId, p->pos.pageId, p->pos.offset);
#endif
taosArrayPush(pGroupResInfo->pRows, &p);
}

View File

@ -624,7 +624,8 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
ASSERT(pResult->info.capacity > 0);
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
colDataDestroy(&idata);
numOfRows = dest.numOfRows;
taosArrayDestroy(pBlockList);
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
@ -679,6 +680,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
ASSERT(pResult->info.capacity > 0);
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
colDataDestroy(&idata);
numOfRows = dest.numOfRows;
taosArrayDestroy(pBlockList);
@ -1506,15 +1508,11 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
int32_t numOfExprs) {
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t start = pGroupResInfo->index;
#ifdef BUF_PAGE_DEBUG
qDebug("\npage_copytoblock rows:%d", numOfRows);
#endif
for (int32_t i = start; i < numOfRows; i += 1) {
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
#ifdef BUF_PAGE_DEBUG
qDebug("page_copytoblock pos pageId:%d, offset:%d", pPos->pos.pageId, pPos->pos.offset);
#endif
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
@ -2222,6 +2220,8 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
if (completed == totalSources) {
return setAllSourcesCompleted(pOperator, startTs);
}
sched_yield();
}
_error:
@ -3748,7 +3748,7 @@ void doDestroyExchangeOperatorInfo(void* param) {
taosArrayDestroy(pExInfo->pSources);
taosArrayDestroy(pExInfo->pSourceDataInfo);
if (pExInfo->pResult != NULL) {
blockDataDestroy(pExInfo->pResult);
pExInfo->pResult = blockDataDestroy(pExInfo->pResult);
}
tsem_destroy(&pExInfo->ready);

View File

@ -29,7 +29,7 @@
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
@ -264,7 +264,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, 0, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
@ -282,7 +282,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret =
setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len,
0, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
@ -293,6 +293,29 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
}
static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
SGroupbyOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes;
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0)? NULL:pRes;
}
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
@ -304,22 +327,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0)? NULL:pRes;
return buildGroupResultDataBlock(pOperator);
}
int32_t order = TSDB_ORDER_ASC;
@ -373,26 +381,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pRes->info.rows > 0) {
break;
}
}
size_t rows = pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL:pRes;
return buildGroupResultDataBlock(pOperator);
}
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
@ -800,7 +789,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
}
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;

View File

@ -1143,7 +1143,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
// must check update info first.
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
if ( (update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup)) ) && out) {
if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup))) && out) {
taosArrayPush(pInfo->tsArray, &rowId);
}
}
@ -1596,7 +1596,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pUpdateRes = createResDataBlock(pDescNode);
pInfo->pCondition = pScanPhyNode->node.pConditions;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->sessionSup =
(SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->groupId = 0;
pInfo->pPullDataRes = createPullDataBlock();
pInfo->pStreamScanOp = pOperator;
@ -1630,7 +1631,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
blockDataDestroy(pInfo->pRes);
const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) {
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
}
@ -1777,14 +1779,14 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
static SSDataBlock* buildSysTableMetaBlock() {
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) {
size_t size = 0;
const SSysTableMeta* pMeta = NULL;
getInfosDbMeta(&pMeta, &size);
int32_t index = 0;
for (int32_t i = 0; i < size; ++i) {
if (strcmp(pMeta[i].name, TSDB_INS_TABLE_USER_TABLES) == 0) {
if (strcmp(pMeta[i].name, tableName) == 0) {
index = i;
break;
}
@ -1800,185 +1802,340 @@ static SSDataBlock* buildSysTableMetaBlock() {
return pBlock;
}
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
if (pInfo->pCur == NULL) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
}
blockDataCleanup(pInfo->pRes);
int32_t numOfRows = 0;
const char* db = NULL;
int32_t vgId = 0;
vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
SName sn = {0};
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&sn, varDataVal(dbname));
varDataSetLen(dbname, strlen(varDataVal(dbname)));
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_USER_TAGS);
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
int32_t ret = 0;
while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
if (pInfo->pCur->mr.me.type != TSDB_CHILD_TABLE) {
continue;
}
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
SMetaReader smr = {0};
metaReaderInit(&smr, pInfo->readHandle.meta, 0);
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
int32_t code = metaGetTableEntryByUid(&smr, suid);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno),
GET_TASKID(pTaskInfo));
metaReaderClear(&smr);
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
longjmp(pTaskInfo->env, terrno);
}
char stableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(stableName, smr.me.name);
int32_t numOfTags = smr.me.stbEntry.schemaTag.nCols;
for (int32_t i = 0; i < numOfTags; ++i) {
SColumnInfoData* pColInfoData = NULL;
// table name
pColInfoData = taosArrayGet(p->pDataBlock, 0);
colDataAppend(pColInfoData, numOfRows, tableName, false);
// database name
pColInfoData = taosArrayGet(p->pDataBlock, 1);
colDataAppend(pColInfoData, numOfRows, dbname, false);
// super table name
pColInfoData = taosArrayGet(p->pDataBlock, 2);
colDataAppend(pColInfoData, numOfRows, stableName, false);
char tagName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tagName, smr.me.stbEntry.schemaTag.pSchema[i].name);
pColInfoData = taosArrayGet(p->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, tagName, false);
int8_t tagType = smr.me.stbEntry.schemaTag.pSchema[i].type;
pColInfoData = taosArrayGet(p->pDataBlock, 4);
colDataAppend(pColInfoData, numOfRows, (char*)&tagType, false);
STagVal tagVal = {0};
tagVal.cid = smr.me.stbEntry.schemaTag.pSchema[i].colId;
char* tagData = NULL;
uint32_t tagLen = 0;
if (tagType == TSDB_DATA_TYPE_JSON) {
tagData = (char*)pInfo->pCur->mr.me.ctbEntry.pTags;
} else {
bool exist = tTagGet((STag*)pInfo->pCur->mr.me.ctbEntry.pTags, &tagVal);
if (exist) {
if (IS_VAR_DATA_TYPE(tagType)) {
tagData = (char*)tagVal.pData;
tagLen = tagVal.nData;
} else {
tagData = (char*)&tagVal.i64;
tagLen = tDataTypes[tagType].bytes;
}
}
}
char* tagVarChar = NULL;
if (tagData != NULL) {
if (tagType == TSDB_DATA_TYPE_JSON) {
char* tagJson = parseTagDatatoJson(tagData);
tagVarChar = taosMemoryMalloc(strlen(tagJson) + VARSTR_HEADER_SIZE);
memcpy(varDataVal(tagVarChar), tagJson, strlen(tagJson));
varDataSetLen(tagVarChar, strlen(tagJson));
taosMemoryFree(tagJson);
} else {
int32_t bufSize = IS_VAR_DATA_TYPE(tagType) ? (tagLen + VARSTR_HEADER_SIZE)
: (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE);
tagVarChar = taosMemoryMalloc(bufSize);
int32_t len = -1;
dataConverToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len);
varDataSetLen(tagVarChar, len);
}
}
pColInfoData = taosArrayGet(p->pDataBlock, 5);
colDataAppend(pColInfoData, numOfRows, tagVarChar,
(tagData == NULL) || (tagType == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(tagData)));
taosMemoryFree(tagVarChar);
++numOfRows;
}
metaReaderClear(&smr);
if (numOfRows >= pOperator->resultInfo.capacity) {
break;
}
}
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
if (ret != 0) {
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
doSetOperatorCompleted(pOperator);
}
p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
doFilterResult(pInfo);
blockDataDestroy(p);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
if (pInfo->readHandle.mnd != NULL) {
buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
doFilterResult(pInfo);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
doSetOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} else {
if (pInfo->pCur == NULL) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
}
blockDataCleanup(pInfo->pRes);
int32_t numOfRows = 0;
const char* db = NULL;
int32_t vgId = 0;
vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
SName sn = {0};
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&sn, varDataVal(dbname));
varDataSetLen(dbname, strlen(varDataVal(dbname)));
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_USER_TABLES);
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t ret = 0;
while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
// table name
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
colDataAppend(pColInfoData, numOfRows, n, false);
// database name
pColInfoData = taosArrayGet(p->pDataBlock, 1);
colDataAppend(pColInfoData, numOfRows, dbname, false);
// vgId
pColInfoData = taosArrayGet(p->pDataBlock, 6);
colDataAppend(pColInfoData, numOfRows, (char*)&vgId, false);
int32_t tableType = pInfo->pCur->mr.me.type;
if (tableType == TSDB_CHILD_TABLE) {
// create time
int64_t ts = pInfo->pCur->mr.me.ctbEntry.ctime;
pColInfoData = taosArrayGet(p->pDataBlock, 2);
colDataAppend(pColInfoData, numOfRows, (char*)&ts, false);
SMetaReader mr = {0};
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
int32_t code = metaGetTableEntryByUid(&mr, suid);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno),
GET_TASKID(pTaskInfo));
metaReaderClear(&mr);
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
longjmp(pTaskInfo->env, terrno);
}
// number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
// super table name
STR_TO_VARSTR(n, mr.me.name);
pColInfoData = taosArrayGet(p->pDataBlock, 4);
colDataAppend(pColInfoData, numOfRows, n, false);
metaReaderClear(&mr);
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
if (pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ctbEntry.comment);
colDataAppend(pColInfoData, numOfRows, comment, false);
} else if (pInfo->pCur->mr.me.ctbEntry.commentLen == 0) {
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, "");
colDataAppend(pColInfoData, numOfRows, comment, false);
} else {
colDataAppendNULL(pColInfoData, numOfRows);
}
// uid
pColInfoData = taosArrayGet(p->pDataBlock, 5);
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);
// ttl
pColInfoData = taosArrayGet(p->pDataBlock, 7);
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ctbEntry.ttlDays, false);
STR_TO_VARSTR(n, "CHILD_TABLE");
} else if (tableType == TSDB_NORMAL_TABLE) {
// create time
pColInfoData = taosArrayGet(p->pDataBlock, 2);
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ctime, false);
// number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
// super table name
pColInfoData = taosArrayGet(p->pDataBlock, 4);
colDataAppendNULL(pColInfoData, numOfRows);
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
if (pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ntbEntry.comment);
colDataAppend(pColInfoData, numOfRows, comment, false);
} else if (pInfo->pCur->mr.me.ntbEntry.commentLen == 0) {
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, "");
colDataAppend(pColInfoData, numOfRows, comment, false);
} else {
colDataAppendNULL(pColInfoData, numOfRows);
}
// uid
pColInfoData = taosArrayGet(p->pDataBlock, 5);
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);
// ttl
pColInfoData = taosArrayGet(p->pDataBlock, 7);
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ttlDays, false);
STR_TO_VARSTR(n, "NORMAL_TABLE");
}
pColInfoData = taosArrayGet(p->pDataBlock, 9);
colDataAppend(pColInfoData, numOfRows, n, false);
if (++numOfRows >= pOperator->resultInfo.capacity) {
break;
}
}
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
if (ret != 0) {
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
doSetOperatorCompleted(pOperator);
}
p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
doFilterResult(pInfo);
blockDataDestroy(p);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
}
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
// build message and send to mnode to fetch the content of system tables.
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
// retrieve local table list info from vnode
const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
if (pInfo->readHandle.mnd != NULL) {
buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
doFilterResult(pInfo);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
doSetOperatorCompleted(pOperator);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} else {
if (pInfo->pCur == NULL) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
}
blockDataCleanup(pInfo->pRes);
int32_t numOfRows = 0;
const char* db = NULL;
int32_t vgId = 0;
vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
SName sn = {0};
char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&sn, varDataVal(dbname));
varDataSetLen(dbname, strlen(varDataVal(dbname)));
SSDataBlock* p = buildSysTableMetaBlock();
blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t ret = 0;
while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) {
STR_TO_VARSTR(n, pInfo->pCur->mr.me.name);
// table name
SColumnInfoData* pColInfoData = taosArrayGet(p->pDataBlock, 0);
colDataAppend(pColInfoData, numOfRows, n, false);
// database name
pColInfoData = taosArrayGet(p->pDataBlock, 1);
colDataAppend(pColInfoData, numOfRows, dbname, false);
// vgId
pColInfoData = taosArrayGet(p->pDataBlock, 6);
colDataAppend(pColInfoData, numOfRows, (char*)&vgId, false);
int32_t tableType = pInfo->pCur->mr.me.type;
if (tableType == TSDB_CHILD_TABLE) {
// create time
int64_t ts = pInfo->pCur->mr.me.ctbEntry.ctime;
pColInfoData = taosArrayGet(p->pDataBlock, 2);
colDataAppend(pColInfoData, numOfRows, (char*)&ts, false);
SMetaReader mr = {0};
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
int32_t code = metaGetTableEntryByUid(&mr, suid);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno),
GET_TASKID(pTaskInfo));
metaReaderClear(&mr);
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
longjmp(pTaskInfo->env, terrno);
}
// number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, (char*)&mr.me.stbEntry.schemaRow.nCols, false);
// super table name
STR_TO_VARSTR(n, mr.me.name);
pColInfoData = taosArrayGet(p->pDataBlock, 4);
colDataAppend(pColInfoData, numOfRows, n, false);
metaReaderClear(&mr);
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
if (pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ctbEntry.comment);
colDataAppend(pColInfoData, numOfRows, comment, false);
} else if (pInfo->pCur->mr.me.ctbEntry.commentLen == 0) {
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, "");
colDataAppend(pColInfoData, numOfRows, comment, false);
} else {
colDataAppendNULL(pColInfoData, numOfRows);
}
// uid
pColInfoData = taosArrayGet(p->pDataBlock, 5);
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);
// ttl
pColInfoData = taosArrayGet(p->pDataBlock, 7);
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ctbEntry.ttlDays, false);
STR_TO_VARSTR(n, "CHILD_TABLE");
} else if (tableType == TSDB_NORMAL_TABLE) {
// create time
pColInfoData = taosArrayGet(p->pDataBlock, 2);
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ctime, false);
// number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3);
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.schemaRow.nCols, false);
// super table name
pColInfoData = taosArrayGet(p->pDataBlock, 4);
colDataAppendNULL(pColInfoData, numOfRows);
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
if (pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ntbEntry.comment);
colDataAppend(pColInfoData, numOfRows, comment, false);
} else if (pInfo->pCur->mr.me.ntbEntry.commentLen == 0) {
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, "");
colDataAppend(pColInfoData, numOfRows, comment, false);
} else {
colDataAppendNULL(pColInfoData, numOfRows);
}
// uid
pColInfoData = taosArrayGet(p->pDataBlock, 5);
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.uid, false);
// ttl
pColInfoData = taosArrayGet(p->pDataBlock, 7);
colDataAppend(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ttlDays, false);
STR_TO_VARSTR(n, "NORMAL_TABLE");
}
pColInfoData = taosArrayGet(p->pDataBlock, 9);
colDataAppend(pColInfoData, numOfRows, n, false);
if (++numOfRows >= pOperator->resultInfo.capacity) {
break;
}
}
// todo temporarily free the cursor here, the true reason why the free is not valid needs to be found
if (ret != 0) {
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
doSetOperatorCompleted(pOperator);
}
p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
doFilterResult(pInfo);
blockDataDestroy(p);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
return sysTableScanUserTables(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
return sysTableScanUserTags(pOperator);
} else { // load the meta from mnode of the given epset
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
@ -2058,7 +2215,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
}
int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
SSDataBlock* p = buildSysTableMetaBlock();
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_USER_TABLES);
blockDataEnsureCapacity(p, capacity);
size_t size = 0;
@ -2147,7 +2304,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
tNameAssign(&pInfo->name, &pScanNode->tableName);
const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
pInfo->readHandle = *(SReadHandle*)readHandle;
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
} else {
@ -2941,4 +3099,3 @@ _error:
taosMemoryFree(pOperator);
return NULL;
}

View File

@ -234,9 +234,9 @@ void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
tsortDestroySortHandle(pInfo->pSortHandle);
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo);
taosMemoryFreeClear(param);
}
@ -539,14 +539,12 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) {
}
pInfo->startTs = taosGetTimestampUs();
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pInputBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupSort);
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
@ -556,7 +554,6 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) {
}
int32_t code = tsortOpen(pInfo->pSortHandle);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
}
@ -674,6 +671,7 @@ void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock);
tsortDestroySortHandle(pInfo->pSortHandle);
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo);

View File

@ -109,6 +109,10 @@ static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
}
void tsortDestroySortHandle(SSortHandle* pSortHandle) {
if (pSortHandle == NULL) {
return;
}
tsortClose(pSortHandle);
if (pSortHandle->pMergeTree != NULL) {
tMergeTreeDestroy(pSortHandle->pMergeTree);
@ -119,7 +123,6 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
blockDataDestroy(pSortHandle->pDataBlock);
for (size_t i = 0; i < taosArrayGetSize(pSortHandle->pOrderedSource); i++){
SSortSource** pSource = taosArrayGet(pSortHandle->pOrderedSource, i);
blockDataDestroy((*pSource)->src.pBlock);
taosMemoryFreeClear(*pSource);
}
taosArrayDestroy(pSortHandle->pOrderedSource);

View File

@ -1132,9 +1132,9 @@ static bool validateStateOper(const SValueNode* pVal) {
if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) {
return false;
}
return (0 == strcasecmp(varDataVal(pVal->datum.p), "GT") || 0 == strcasecmp(varDataVal(pVal->datum.p), "GE") ||
0 == strcasecmp(varDataVal(pVal->datum.p), "LT") || 0 == strcasecmp(varDataVal(pVal->datum.p), "LE") ||
0 == strcasecmp(varDataVal(pVal->datum.p), "EQ") || 0 == strcasecmp(varDataVal(pVal->datum.p), "NE"));
return (0 == strncasecmp(varDataVal(pVal->datum.p), "GT", 2) || 0 == strncasecmp(varDataVal(pVal->datum.p), "GE", 2) ||
0 == strncasecmp(varDataVal(pVal->datum.p), "LT", 2) || 0 == strncasecmp(varDataVal(pVal->datum.p), "LE", 2) ||
0 == strncasecmp(varDataVal(pVal->datum.p), "EQ", 2) || 0 == strncasecmp(varDataVal(pVal->datum.p), "NE", 2));
}
static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
@ -2419,6 +2419,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getStateFuncEnv,
.initFunc = functionSetup,
.processFunc = stateCountFunction,
.sprocessFunc = stateCountScalarFunction,
.finalizeFunc = NULL
},
{

View File

@ -4469,17 +4469,17 @@ bool getStateFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
static int8_t getStateOpType(char* opStr) {
int8_t opType;
if (strcasecmp(opStr, "LT") == 0) {
if (strncasecmp(opStr, "LT", 2) == 0) {
opType = STATE_OPER_LT;
} else if (strcasecmp(opStr, "GT") == 0) {
} else if (strncasecmp(opStr, "GT", 2) == 0) {
opType = STATE_OPER_GT;
} else if (strcasecmp(opStr, "LE") == 0) {
} else if (strncasecmp(opStr, "LE", 2) == 0) {
opType = STATE_OPER_LE;
} else if (strcasecmp(opStr, "GE") == 0) {
} else if (strncasecmp(opStr, "GE", 2) == 0) {
opType = STATE_OPER_GE;
} else if (strcasecmp(opStr, "NE") == 0) {
} else if (strncasecmp(opStr, "NE", 2) == 0) {
opType = STATE_OPER_NE;
} else if (strcasecmp(opStr, "EQ") == 0) {
} else if (strncasecmp(opStr, "EQ", 2) == 0) {
opType = STATE_OPER_EQ;
} else {
opType = STATE_OPER_INVALID;

View File

@ -855,6 +855,7 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
memcpy(output->columnData,
taosArrayGet(input->pDataBlock, 0),
sizeof(SColumnInfoData));
output->colAlloced = true;
return 0;
}

View File

@ -19,8 +19,8 @@
const static uint32_t STATE_LIMIT = 1000;
static int dfaInstsEqual(const void *a, const void *b, size_t size) {
SArray *ar = (SArray *)a;
SArray *br = (SArray *)b;
SArray *ar = *(SArray **)a;
SArray *br = *(SArray **)b;
size_t al = ar != NULL ? taosArrayGetSize(ar) : 0;
size_t bl = br != NULL ? taosArrayGetSize(br) : 0;
if (al != bl) {
@ -71,8 +71,8 @@ FstDfa *dfaBuilderBuild(FstDfaBuilder *builder) {
dfaAdd(builder->dfa, cur, 0);
SArray *states = taosArrayInit(0, sizeof(uint32_t));
uint32_t result;
SArray *states = taosArrayInit(0, sizeof(uint32_t));
if (dfaBuilderCacheState(builder, cur, &result)) {
taosArrayPush(states, &result);
}
@ -146,10 +146,9 @@ bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *r
*result = *v;
taosArrayDestroy(tinsts);
} else {
DfaState st;
st.insts = tinsts;
st.isMatch = isMatch;
DfaState st = {.insts = tinsts, .isMatch = isMatch};
taosArrayPush(builder->dfa->states, &st);
int32_t sz = taosArrayGetSize(builder->dfa->states) - 1;
taosHashPut(builder->cache, &tinsts, sizeof(POINTER_BYTES), &sz, sizeof(sz));
*result = sz;

View File

@ -85,11 +85,12 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
blk->blockId = blkId;
blk->nread = taosPReadFile(ctx->file.pFile, blk->buf, kBlockSize, blkId * kBlockSize);
assert(blk->nread <= kBlockSize);
nread = TMIN(blkLeft, len);
if (blk->nread < kBlockSize && blk->nread < len) {
break;
}
nread = TMIN(blkLeft, len);
memcpy(buf + total, blk->buf + blkOffset, nread);
LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,

View File

@ -78,8 +78,8 @@ bool sparSetContains(FstSparseSet *ss, int32_t ip) {
if (ip >= ss->cap || ip < 0) {
return false;
}
int32_t i = ss->sparse[ip];
int32_t i = ss->sparse[ip];
if (i >= 0 && i < ss->cap && i < ss->size && ss->dense[i] == ip) {
return true;
} else {

View File

@ -952,6 +952,7 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
SQueryInserterNode* pSink = (SQueryInserterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink);
nodesDestroyList(pSink->pCols);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
@ -1543,7 +1544,7 @@ typedef struct SCollectFuncsCxt {
int32_t errCode;
FFuncClassifier classifier;
SNodeList* pFuncs;
SHashObj* pAliasName;
SHashObj* pFuncsSet;
} SCollectFuncsCxt;
static EDealRes collectFuncs(SNode* pNode, void* pContext) {
@ -1551,28 +1552,40 @@ static EDealRes collectFuncs(SNode* pNode, void* pContext) {
if (QUERY_NODE_FUNCTION == nodeType(pNode) && pCxt->classifier(((SFunctionNode*)pNode)->funcId) &&
!(((SExprNode*)pNode)->orderAlias)) {
SExprNode* pExpr = (SExprNode*)pNode;
if (NULL == taosHashGet(pCxt->pAliasName, pExpr->aliasName, strlen(pExpr->aliasName))) {
if (NULL == taosHashGet(pCxt->pFuncsSet, &pExpr, POINTER_BYTES)) {
pCxt->errCode = nodesListStrictAppend(pCxt->pFuncs, nodesCloneNode(pNode));
taosHashPut(pCxt->pAliasName, pExpr->aliasName, strlen(pExpr->aliasName), &pExpr, POINTER_BYTES);
taosHashPut(pCxt->pFuncsSet, &pExpr, POINTER_BYTES, &pExpr, POINTER_BYTES);
}
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
}
return DEAL_RES_CONTINUE;
}
static uint32_t funcNodeHash(const char* pKey, uint32_t len) {
SExprNode* pExpr = *(SExprNode**)pKey;
return MurmurHash3_32(pExpr->aliasName, strlen(pExpr->aliasName));
}
static int32_t funcNodeEqual(const void* pLeft, const void* pRight, size_t len) {
if (0 != strcmp((*(const SExprNode**)pLeft)->aliasName, (*(const SExprNode**)pRight)->aliasName)) {
return 1;
}
return nodesEqualNode(*(const SNode**)pLeft, *(const SNode**)pRight) ? 0 : 1;
}
int32_t nodesCollectFuncs(SSelectStmt* pSelect, ESqlClause clause, FFuncClassifier classifier, SNodeList** pFuncs) {
if (NULL == pSelect || NULL == pFuncs) {
return TSDB_CODE_FAILED;
}
SCollectFuncsCxt cxt = {
.errCode = TSDB_CODE_SUCCESS,
.classifier = classifier,
.pFuncs = (NULL == *pFuncs ? nodesMakeList() : *pFuncs),
.pAliasName = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, false)};
if (NULL == cxt.pFuncs) {
SCollectFuncsCxt cxt = {.errCode = TSDB_CODE_SUCCESS,
.classifier = classifier,
.pFuncs = (NULL == *pFuncs ? nodesMakeList() : *pFuncs),
.pFuncsSet = taosHashInit(4, funcNodeHash, false, false)};
if (NULL == cxt.pFuncs || NULL == cxt.pFuncsSet) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashSetEqualFp(cxt.pFuncsSet, funcNodeEqual);
*pFuncs = NULL;
nodesWalkSelectStmt(pSelect, clause, collectFuncs, &cxt);
if (TSDB_CODE_SUCCESS == cxt.errCode) {
@ -1584,7 +1597,7 @@ int32_t nodesCollectFuncs(SSelectStmt* pSelect, ESqlClause clause, FFuncClassifi
} else {
nodesDestroyList(cxt.pFuncs);
}
taosHashCleanup(cxt.pAliasName);
taosHashCleanup(cxt.pFuncsSet);
return cxt.errCode;
}

View File

@ -371,6 +371,19 @@ static int32_t collectMetaKeyFromShowTables(SCollectMetaKeyCxt* pCxt, SShowStmt*
return code;
}
static int32_t collectMetaKeyFromShowTags(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB,
TSDB_INS_TABLE_USER_TAGS, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
if (NULL != pStmt->pDbName) {
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, ((SValueNode*)pStmt->pDbName)->literal, pCxt->pMetaCache);
} else {
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, pCxt->pMetaCache);
}
}
return code;
}
static int32_t collectMetaKeyFromShowUsers(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USER_USERS,
pCxt->pMetaCache);
@ -537,6 +550,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowStreams(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_TABLES_STMT:
return collectMetaKeyFromShowTables(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_TAGS_STMT:
return collectMetaKeyFromShowTags(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_USERS_STMT:
return collectMetaKeyFromShowUsers(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_LICENCE_STMT:

View File

@ -868,7 +868,8 @@ static EDealRes translateNormalValue(STranslateContext* pCxt, SValueNode* pVal,
}
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
if (strict && (pVal->node.resType.bytes > targetDt.bytes - VARSTR_HEADER_SIZE)) {
if (strict && (!IS_VAR_DATA_TYPE(pVal->node.resType.type) ||
pVal->node.resType.bytes > targetDt.bytes - VARSTR_HEADER_SIZE)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
pVal->datum.p = taosMemoryCalloc(1, targetDt.bytes + 1);
@ -888,6 +889,9 @@ static EDealRes translateNormalValue(STranslateContext* pCxt, SValueNode* pVal,
break;
}
case TSDB_DATA_TYPE_NCHAR: {
if (strict && !IS_VAR_DATA_TYPE(pVal->node.resType.type)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
pVal->datum.p = taosMemoryCalloc(1, targetDt.bytes + 1);
if (NULL == pVal->datum.p) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
@ -1168,7 +1172,7 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p
return TSDB_CODE_SUCCESS;
}
if (isSelectStmt(pCxt->pCurrStmt)) {
//select percentile() without from clause is also valid
// select percentile() without from clause is also valid
if (NULL == ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable) {
return TSDB_CODE_SUCCESS;
}
@ -1681,7 +1685,8 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
static bool sysTableFromVnode(const char* pTable) {
return (0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLES)) ||
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED));
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED) ||
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TAGS)));
}
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
@ -1697,7 +1702,7 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
}
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
}
@ -1786,7 +1791,8 @@ static bool isSingleTable(SRealTableNode* pRealTable) {
int8_t tableType = pRealTable->pMeta->tableType;
if (TSDB_SYSTEM_TABLE == tableType) {
return 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES) &&
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED);
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED) &&
0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TAGS);
}
return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType);
}
@ -5059,6 +5065,8 @@ static const char* getSysTableName(ENodeType type) {
return TSDB_INS_TABLE_USER_DATABASES;
case QUERY_NODE_SHOW_TABLES_STMT:
return TSDB_INS_TABLE_USER_TABLES;
case QUERY_NODE_SHOW_TAGS_STMT:
return TSDB_INS_TABLE_USER_TAGS;
case QUERY_NODE_SHOW_STABLES_STMT:
return TSDB_INS_TABLE_USER_STABLES;
case QUERY_NODE_SHOW_USERS_STMT:

View File

@ -282,7 +282,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pScan->hasNormalCols = true;
}
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code && SCAN_TYPE_SYSTEM_TABLE != pScan->scanType) {
code = addPrimaryKeyCol(pScan->tableId, &pScan->pScanCols);
}

View File

@ -1050,8 +1050,11 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
}
}
int32_t code =
replaceLogicNode(pLogicSubplan, (SLogicNode*)pSort, (SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0));
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0);
if (NULL == pSort->node.pParent) {
TSWAP(pSort->node.pTargets, pChild->pTargets);
}
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pSort, pChild);
if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pSort->node.pChildren);
nodesDestroyNode((SNode*)pSort);
@ -1982,11 +1985,15 @@ static int32_t rewriteUniqueOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
}
static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || !(((SAggLogicNode*)pNode)->hasLastRow) ||
NULL != ((SAggLogicNode*)pNode)->pGroupKeys || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)) ||
NULL != ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->node.pConditions ||
0 == ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->cacheLastMode) {
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
return false;
}
SAggLogicNode* pAgg = (SAggLogicNode*)pNode;
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
if (!pAgg->hasLastRow || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions || 0 == pScan->cacheLastMode ||
IS_TSWINDOW_SPECIFIED(pScan->scanRange)) {
return false;
}

View File

@ -570,7 +570,8 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
pScan->showRewrite = pScanLogicNode->showRewrite;
pScan->accountId = pCxt->pPlanCxt->acctId;
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES) ||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED)) {
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED) ||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TAGS)) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
} else {
pSubplan->execNode.nodeId = MNODE_HANDLE;

View File

@ -45,6 +45,12 @@ TEST_F(PlanJoinTest, withWhere) {
"WHERE t1.c1 > t2.c1 AND t1.c2 = 'abc' AND t2.c2 = 'qwe'");
}
TEST_F(PlanJoinTest, withAggAndOrderBy) {
useDb("root", "test");
run("SELECT t1.ts, TOP(t2.c1, 10) FROM st1s1 t1 JOIN st1s2 t2 ON t1.ts = t2.ts ORDER BY t2.ts");
}
TEST_F(PlanJoinTest, multiJoin) {
useDb("root", "test");

View File

@ -84,6 +84,12 @@ TEST_F(PlanOptimizeTest, eliminateProjection) {
// run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) order by 1 nulls first");
}
TEST_F(PlanOptimizeTest, mergeProjects) {
useDb("root", "test");
run("SELECT * FROM (SELECT * FROM t1 WHERE c1 > 10 ORDER BY ts) ORDER BY ts");
}
TEST_F(PlanOptimizeTest, pushDownProjectCond) {
useDb("root", "test");
run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) where 1-c1>5 order by 1 nulls first");

View File

@ -55,7 +55,7 @@ int32_t sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows, SScalarPara
}
pParam->columnData = pColumnData;
pParam->type = SHOULD_FREE_COLDATA;
pParam->colAlloced = true;
return TSDB_CODE_SUCCESS;
}
@ -166,6 +166,10 @@ void sclFreeRes(SHashObj *res) {
}
void sclFreeParam(SScalarParam *param) {
if (!param->colAlloced) {
return;
}
if (param->columnData != NULL) {
colDataDestroy(param->columnData);
taosMemoryFreeClear(param->columnData);
@ -173,6 +177,7 @@ void sclFreeParam(SScalarParam *param) {
if (param->pHashFilter != NULL) {
taosHashCleanup(param->pHashFilter);
param->pHashFilter = NULL;
}
}
@ -191,6 +196,19 @@ int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) {
return TSDB_CODE_SUCCESS;
}
void sclFreeParamList(SScalarParam *param, int32_t paramNum) {
if (NULL == param) {
return;
}
for (int32_t i = 0; i < paramNum; ++i) {
SScalarParam* p = param + i;
sclFreeParam(p);
}
taosMemoryFree(param);
}
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
switch (nodeType(node)) {
case QUERY_NODE_LEFT_VALUE: {
@ -225,11 +243,14 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, type));
param->hashValueType = type;
param->colAlloced = true;
if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
taosHashCleanup(param->pHashFilter);
param->pHashFilter = NULL;
sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
param->colAlloced = false;
break;
}
case QUERY_NODE_COLUMN: {
@ -274,6 +295,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
*param = *res;
param->colAlloced = false;
break;
}
default:
@ -455,11 +477,7 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
_return:
for (int32_t i = 0; i < paramNum; ++i) {
// sclFreeParamNoData(params + i);
}
taosMemoryFreeClear(params);
sclFreeParamList(params, paramNum);
SCL_RET(code);
}
@ -533,11 +551,7 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
_return:
for (int32_t i = 0; i < paramNum; ++i) {
// sclFreeParamNoData(params + i);
}
taosMemoryFreeClear(params);
sclFreeParamList(params, paramNum);
SCL_RET(code);
}
@ -573,14 +587,8 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
code = terrno;
_return:
for (int32_t i = 0; i < paramNum; ++i) {
if (params[i].type == SHOULD_FREE_COLDATA) {
colDataDestroy(params[i].columnData);
taosMemoryFreeClear(params[i].columnData);
}
}
taosMemoryFreeClear(params);
sclFreeParamList(params, paramNum);
SCL_RET(code);
}
@ -871,7 +879,6 @@ EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
return DEAL_RES_ERROR;
}
output.type = DELEGATED_MGMT_COLDATA;
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
@ -906,7 +913,6 @@ EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
return DEAL_RES_ERROR;
}
output.type = DELEGATED_MGMT_COLDATA;
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;

View File

@ -2427,3 +2427,153 @@ int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *
int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return sumScalarFunction(pInput, inputNum, pOutput);
}
typedef enum {
STATE_OPER_INVALID = 0,
STATE_OPER_LT,
STATE_OPER_GT,
STATE_OPER_LE,
STATE_OPER_GE,
STATE_OPER_NE,
STATE_OPER_EQ,
} EStateOperType;
#define STATE_COMP(_op, _lval, _rval, _rtype) STATE_COMP_IMPL(_op, _lval, GET_STATE_VAL(_rval, _rtype))
#define GET_STATE_VAL(_val, _type) ((_type == TSDB_DATA_TYPE_BIGINT) ? (*(int64_t *)_val) : (*(double *)_val))
#define STATE_COMP_IMPL(_op, _lval, _rval) \
do { \
switch (_op) { \
case STATE_OPER_LT: \
return ((_lval) < (_rval)); \
break; \
case STATE_OPER_GT: \
return ((_lval) > (_rval)); \
break; \
case STATE_OPER_LE: \
return ((_lval) <= (_rval)); \
break; \
case STATE_OPER_GE: \
return ((_lval) >= (_rval)); \
break; \
case STATE_OPER_NE: \
return ((_lval) != (_rval)); \
break; \
case STATE_OPER_EQ: \
return ((_lval) == (_rval)); \
break; \
default: \
break; \
} \
} while (0)
static int8_t getStateOpType(char* opStr) {
int8_t opType;
if (strncasecmp(opStr, "LT", 2) == 0) {
opType = STATE_OPER_LT;
} else if (strncasecmp(opStr, "GT", 2) == 0) {
opType = STATE_OPER_GT;
} else if (strncasecmp(opStr, "LE", 2) == 0) {
opType = STATE_OPER_LE;
} else if (strncasecmp(opStr, "GE", 2) == 0) {
opType = STATE_OPER_GE;
} else if (strncasecmp(opStr, "NE", 2) == 0) {
opType = STATE_OPER_NE;
} else if (strncasecmp(opStr, "EQ", 2) == 0) {
opType = STATE_OPER_EQ;
} else {
opType = STATE_OPER_INVALID;
}
return opType;
}
static bool checkStateOp(int8_t op, SColumnInfoData* pCol, int32_t index, SScalarParam *pCondParam) {
char* data = colDataGetData(pCol, index);
char* param = pCondParam->columnData->pData;
int32_t paramType = GET_PARAM_TYPE(pCondParam);
switch (pCol->info.type) {
case TSDB_DATA_TYPE_TINYINT: {
int8_t v = *(int8_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
uint8_t v = *(uint8_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t v = *(int16_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
uint16_t v = *(uint16_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t v = *(int32_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_UINT: {
uint32_t v = *(uint32_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t v = *(int64_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
uint64_t v = *(uint64_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float v = *(float*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double v = *(double*)data;
STATE_COMP(op, v, param, paramType);
break;
}
default: {
ASSERT(0);
}
}
return false;
}
int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
int8_t op = getStateOpType(varDataVal(pInput[1].columnData->pData));
int64_t count = 0;
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_s(pInputData, i)) {
colDataAppendNULL(pOutputData, i);
continue;
}
bool ret = checkStateOp(op, pInputData, i, &pInput[2]);
int64_t out = -1;
if (ret) {
out = ++count;
} else {
count = 0;
}
colDataAppend(pOutputData, i, (char*)&out, false);
}
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}

View File

@ -207,7 +207,7 @@ void flttMakeListNode(SNode **pNode, SNodeList *list, int32_t resType) {
void initScalarParam(SScalarParam* pParam) {
memset(pParam, 0, sizeof(SScalarParam));
pParam->type = SHOULD_FREE_COLDATA;
pParam->colAlloced = true;
}
}

View File

@ -67,7 +67,8 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
static int tdbBtcMoveDownward(SBTC *pBtc);
static int tdbBtcMoveUpward(SBTC *pBtc);
int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, tdb_cmpr_fn_t kcmpr, SBTree **ppBt) {
int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPgno pgno, tdb_cmpr_fn_t kcmpr,
SBTree **ppBt) {
SBTree *pBt;
int ret;
@ -99,13 +100,54 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, tdb_cmpr_fn_t kcmpr, SB
// pBt->minLeaf
pBt->minLeaf = pBt->minLocal;
// if pgno == 0 fetch new btree root leaf page
if (pgno == 0) {
// fetch page & insert into main db
// allocate a new child page
SPage *pPage;
TXN txn;
tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
pPager->inTran = 1;
SBtreeInitPageArg zArg;
zArg.flags = 0x1 | 0x2; // root leaf node;
zArg.pBt = pBt;
ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &zArg, &txn);
if (ret < 0) {
return -1;
}
// TODO: Need to zero the page
ret = tdbPagerWrite(pPager, pPage);
if (ret < 0) {
return -1;
}
if (strcmp(TDB_MAINDB_NAME, tbname)) {
ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pgno, sizeof(SPgno), &txn);
if (ret < 0) {
return -1;
}
}
// tdbUnrefPage(pPage);
tdbPCacheRelease(pPager->pCache, pPage, &txn);
tdbCommit(pPager->pEnv, &txn);
tdbTxnClose(&txn);
}
ASSERT(pgno != 0);
pBt->root = pgno;
/*
// TODO: pBt->root
ret = tdbBtreeOpenImpl(pBt);
if (ret < 0) {
tdbOsFree(pBt);
return -1;
}
*/
*ppBt = pBt;
return 0;
}
@ -338,7 +380,6 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
ASSERT(pgno != 0);
pBt->root = pgno;
return 0;
}

View File

@ -64,6 +64,14 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) {
mkdir(dbname, 0755);
#ifdef USE_MAINDB
// open main db
ret = tdbTbOpen(TDB_MAINDB_NAME, -1, sizeof(SPgno), NULL, pDb, &pDb->pMainDb);
if (ret < 0) {
return -1;
}
#endif
*ppDb = pDb;
return 0;
}
@ -72,6 +80,10 @@ int tdbClose(TDB *pDb) {
SPager *pPager;
if (pDb) {
#ifdef USE_MAINDB
if (pDb->pMainDb) tdbTbClose(pDb->pMainDb);
#endif
for (pPager = pDb->pgrList; pPager; pPager = pDb->pgrList) {
pDb->pgrList = pPager->pNext;
tdbPagerClose(pPager);
@ -179,4 +191,4 @@ void tdbEnvRemovePager(TDB *pDb, SPager *pPager) {
if (pDb->nPgrHash > 8 && pDb->nPager < pDb->nPgrHash / 2) {
// TODO
}
}
}

View File

@ -174,6 +174,13 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
for (ppPage = &pPager->pDirty; (*ppPage) && TDB_PAGE_PGNO(*ppPage) < TDB_PAGE_PGNO(pPage);
ppPage = &((*ppPage)->pDirtyNext)) {
}
if (*ppPage && TDB_PAGE_PGNO(*ppPage) == TDB_PAGE_PGNO(pPage)) {
tdbUnrefPage(pPage);
return 0;
}
ASSERT(*ppPage == NULL || TDB_PAGE_PGNO(*ppPage) > TDB_PAGE_PGNO(pPage));
pPage->pDirtyNext = *ppPage;
*ppPage = pPage;
@ -467,7 +474,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
}
TXN txn;
tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
SBtreeInitPageArg iArg;
iArg.pBt = pBt;
iArg.flags = 0;
@ -487,6 +494,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
return -1;
}
/*
ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &iArg, &txn);
if (ret < 0) {
return -1;
@ -499,6 +507,18 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
}
tdbPCacheRelease(pPager->pCache, pPage, &txn);
*/
i64 offset = pPager->pageSize * (pgno - 1);
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
ASSERT(0);
return -1;
}
ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
if (ret < 0) {
ASSERT(0);
return -1;
}
}
tdbOsFSync(pPager->fd);

View File

@ -16,7 +16,7 @@
#include "tdbInt.h"
struct STTB {
TDB * pEnv;
TDB *pEnv;
SBTree *pBt;
};
@ -25,12 +25,16 @@ struct STBC {
};
int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb) {
TTB * pTb;
TTB *pTb;
SPager *pPager;
int ret;
char fFullName[TDB_FILENAME_LEN];
SPage * pPage;
SPage *pPage;
SPgno pgno;
void *pKey = NULL;
int nKey = 0;
void *pData = NULL;
int nData = 0;
*ppTb = NULL;
@ -42,6 +46,48 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
// pTb->pEnv
pTb->pEnv = pEnv;
#ifdef USE_MAINDB
snprintf(fFullName, TDB_FILENAME_LEN, "%s/%s", pEnv->dbName, TDB_MAINDB_NAME);
if (strcmp(TDB_MAINDB_NAME, tbname)) {
pPager = tdbEnvGetPager(pEnv, fFullName);
if (!pPager) {
return -1;
}
ret = tdbTbGet(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pData, &nData);
if (ret < 0) {
// new pgno & insert into main db
pgno = 0;
} else {
pgno = *(SPgno *)pData;
tdbFree(pKey);
tdbFree(pData);
}
} else {
pPager = tdbEnvGetPager(pEnv, fFullName);
if (pPager == NULL) {
ret = tdbPagerOpen(pEnv->pCache, fFullName, &pPager);
if (ret < 0) {
return -1;
}
tdbEnvAddPager(pEnv, pPager);
pPager->pEnv = pEnv;
}
if (pPager->dbOrigSize > 0) {
pgno = 1;
} else {
pgno = 0;
}
}
#else
pPager = tdbEnvGetPager(pEnv, tbname);
if (pPager == NULL) {
snprintf(fFullName, TDB_FILENAME_LEN, "%s/%s", pEnv->dbName, tbname);
@ -53,10 +99,12 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
tdbEnvAddPager(pEnv, pPager);
}
#endif
ASSERT(pPager != NULL);
// pTb->pBt
ret = tdbBtreeOpen(keyLen, valLen, pPager, keyCmprFn, &(pTb->pBt));
ret = tdbBtreeOpen(keyLen, valLen, pPager, tbname, pgno, keyCmprFn, &(pTb->pBt));
if (ret < 0) {
return -1;
}

View File

@ -150,7 +150,8 @@ struct SBTC {
};
// SBTree
int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, tdb_cmpr_fn_t kcmpr, SBTree **ppBt);
int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, char const *tbname, SPgno pgno, tdb_cmpr_fn_t kcmpr,
SBTree **ppBt);
int tdbBtreeClose(SBTree *pBt);
int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn);
@ -356,6 +357,12 @@ static inline SCell *tdbPageGetCell(SPage *pPage, int idx) {
return pCell;
}
#define USE_MAINDB
#ifdef USE_MAINDB
#define TDB_MAINDB_NAME "main.tdb"
#endif
struct STDB {
char *dbName;
char *jnName;
@ -365,6 +372,9 @@ struct STDB {
int nPager;
int nPgrHash;
SPager **pgrHash;
#ifdef USE_MAINDB
TTB *pMainDb;
#endif
};
struct SPager {
@ -381,6 +391,9 @@ struct SPager {
u8 inTran;
SPager *pNext; // used by TDB
SPager *pHashNext; // used by TDB
#ifdef USE_MAINDB
TDB *pEnv;
#endif
};
#ifdef __cplusplus

View File

@ -266,26 +266,27 @@
./test.sh -f tsim/tmq/snapshot1.sim
# --- stable
./test.sh -f tsim/stable/alter_comment.sim
./test.sh -f tsim/stable/alter_count.sim
./test.sh -f tsim/stable/alter_import.sim
./test.sh -f tsim/stable/alter_insert1.sim
./test.sh -f tsim/stable/alter_insert2.sim
./test.sh -f tsim/stable/alter_metrics.sim
./test.sh -f tsim/stable/column_add.sim
./test.sh -f tsim/stable/column_drop.sim
./test.sh -f tsim/stable/column_modify.sim
./test.sh -f tsim/stable/disk.sim
./test.sh -f tsim/stable/dnode3.sim
./test.sh -f tsim/stable/metrics.sim
./test.sh -f tsim/stable/refcount.sim
./test.sh -f tsim/stable/show.sim
./test.sh -f tsim/stable/values.sim
./test.sh -f tsim/stable/vnode3.sim
./test.sh -f tsim/stable/column_add.sim
./test.sh -f tsim/stable/column_drop.sim
./test.sh -f tsim/stable/column_modify.sim
./test.sh -f tsim/stable/tag_add.sim
./test.sh -f tsim/stable/tag_drop.sim
./test.sh -f tsim/stable/tag_filter.sim
./test.sh -f tsim/stable/tag_modify.sim
./test.sh -f tsim/stable/tag_rename.sim
./test.sh -f tsim/stable/alter_comment.sim
./test.sh -f tsim/stable/alter_count.sim
./test.sh -f tsim/stable/alter_insert1.sim
./test.sh -f tsim/stable/alter_insert2.sim
./test.sh -f tsim/stable/alter_import.sim
./test.sh -f tsim/stable/tag_filter.sim
./test.sh -f tsim/stable/values.sim
./test.sh -f tsim/stable/vnode3.sim
# --- for multi process mode
./test.sh -f tsim/user/basic.sim -m
@ -309,7 +310,7 @@
./test.sh -f tsim/valgrind/checkError1.sim
./test.sh -f tsim/valgrind/checkError2.sim
./test.sh -f tsim/valgrind/checkError3.sim
# jira ./test.sh -f tsim/valgrind/checkError4.sim
./test.sh -f tsim/valgrind/checkError4.sim
# --- vnode
# unsupport ./test.sh -f tsim/vnode/replica3_basic.sim
@ -367,7 +368,7 @@
./test.sh -f tsim/compute/diff2.sim
./test.sh -f tsim/compute/first.sim
./test.sh -f tsim/compute/interval.sim
# jira ./test.sh -f tsim/compute/last_row.sim
./test.sh -f tsim/compute/last_row.sim
./test.sh -f tsim/compute/last.sim
./test.sh -f tsim/compute/leastsquare.sim
./test.sh -f tsim/compute/max.sim
@ -416,7 +417,7 @@
./test.sh -f tsim/tag/4.sim
./test.sh -f tsim/tag/5.sim
# jira ./test.sh -f tsim/tag/6.sim
# jira ./test.sh -f tsim/tag/add.sim
./test.sh -f tsim/tag/add.sim
./test.sh -f tsim/tag/bigint.sim
./test.sh -f tsim/tag/binary_binary.sim
./test.sh -f tsim/tag/binary.sim

View File

@ -65,8 +65,6 @@ if $data00 != 19 then
return -1
endi
print =============== step7
sql select last_row(tbcol) from $mt
print ===> $data00

View File

@ -99,7 +99,7 @@ if $rows != 1 then
endi
#sql select * from information_schema.`streams`
sql select * from information_schema.user_tables
if $rows != 30 then
if $rows != 31 then
return -1
endi
#sql select * from information_schema.user_table_distributed
@ -197,7 +197,7 @@ if $rows != 1 then
endi
#sql select * from performance_schema.`streams`
sql select * from information_schema.user_tables
if $rows != 30 then
if $rows != 31 then
return -1
endi
#sql select * from information_schema.user_table_distributed

View File

@ -934,11 +934,8 @@ if $data79 != null then
endi
print ======== step9
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 3000
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql select * from tb order by ts asc
if $rows != 8 then

View File

@ -604,9 +604,7 @@ sql_error alter table tb drop column a
print ======== step9
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 3000
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql select * from tb order by ts desc
if $rows != 7 then

View File

@ -69,8 +69,6 @@ if $rows != 2 then
return -1
endi
sleep 100
print =============== step2
$i = 1
$tb = $tbPrefix . $i

View File

@ -24,7 +24,7 @@ sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol1 tinyint, tgcol2 int,
$i = 0
while $i < 5
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( 0, 0, 0, 0, 0 )
sql create table $tb using $mt tags( 0, 0, 0, 0, '0' )
$x = 0
while $x < $rowNum
$ms = $x . m
@ -35,7 +35,7 @@ while $i < 5
endw
while $i < 10
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( 1, 1, 1, 1, 1 )
sql create table $tb using $mt tags( 1, 1, 1, 1, '1' )
$x = 0
while $x < $rowNum
$ms = $x . m

View File

@ -250,8 +250,8 @@ sql alter table $mt add tag tgcol6 binary(10)
sql reset query cache
sql alter table $tb set tag tgcol4=false
sql alter table $tb set tag tgcol5=5
sql alter table $tb set tag tgcol6=6
sql alter table $tb set tag tgcol5='5'
sql alter table $tb set tag tgcol6='6'
sql reset query cache
sql select * from $mt where tgcol5 = '5'
@ -390,8 +390,8 @@ sql alter table $mt add tag tgcol5 binary(17)
sql alter table $mt add tag tgcol6 bool
sql reset query cache
sql alter table $tb set tag tgcol4=4
sql alter table $tb set tag tgcol5=5
sql alter table $tb set tag tgcol6=1
sql alter table $tb set tag tgcol5='5'
sql alter table $tb set tag tgcol6='1'
sql reset query cache
sql select * from $mt where tgcol5 = '5'
@ -409,7 +409,7 @@ endi
if $data03 != 5 then
return -1
endi
if $data04 != 1 then
if $data04 != 0 then
return -1
endi
@ -425,7 +425,7 @@ $i = 9
$mt = $mtPrefix . $i
$tb = $tbPrefix . $i
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol1 double, tgcol2 binary(10), tgcol3 binary(10))
sql create table $tb using $mt tags( 1, 2, '3' )
sql create table $tb using $mt tags( 1, '2', '3' )
sql insert into $tb values(now, 1)
sql select * from $mt where tgcol2 = '2'
if $rows != 1 then
@ -520,7 +520,7 @@ sql alter table $mt add tag tgcol4 binary(10)
sql alter table $mt add tag tgcol5 bool
sql reset query cache
sql alter table $tb set tag tgcol4=4
sql alter table $tb set tag tgcol4='4'
sql alter table $tb set tag tgcol5=false
sql reset query cache
@ -598,9 +598,9 @@ sql alter table $mt add tag tgcol7 bigint
sql alter table $mt add tag tgcol8 smallint
sql reset query cache
sql alter table $tb set tag tgcol4=4
sql alter table $tb set tag tgcol4='4'
sql alter table $tb set tag tgcol5=5
sql alter table $tb set tag tgcol6=6
sql alter table $tb set tag tgcol6='6'
sql alter table $tb set tag tgcol7=7
sql alter table $tb set tag tgcol8=8
sql reset query cache
@ -687,11 +687,11 @@ sql alter table $mt add tag tgcol5 bigint
sql reset query cache
sql alter table $tb set tag tgcol1=false
sql alter table $tb set tag tgcol2=5
sql alter table $tb set tag tgcol2='5'
sql alter table $tb set tag tgcol3=4
sql alter table $tb set tag tgcol4=3
sql alter table $tb set tag tgcol4='3'
sql alter table $tb set tag tgcol5=2
sql alter table $tb set tag tgcol6=1
sql alter table $tb set tag tgcol6='1'
sql reset query cache
sql select * from $mt where tgcol4 = '3'
@ -783,8 +783,8 @@ sql alter table $mt add tag tgcol4 int
sql alter table $mt add tag tgcol6 bigint
sql reset query cache
sql alter table $tb set tag tgcol1=7
sql alter table $tb set tag tgcol2=8
sql alter table $tb set tag tgcol1='7'
sql alter table $tb set tag tgcol2='8'
sql alter table $tb set tag tgcol3=9
sql alter table $tb set tag tgcol4=10
sql alter table $tb set tag tgcol5=11

View File

@ -40,19 +40,19 @@ if $data03 != 2 then
return -1
endi
sql alter table $mt change tag tagcx tgcol3 -x step21
sql alter table $mt rename tag tagcx tgcol3 -x step21
return -1
step21:
sql alter table $mt change tag tgcol1 tgcol2 -x step22
sql alter table $mt rename tag tgcol1 tgcol2 -x step22
return -1
step22:
#sql alter table $mt change tag tgcol1 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx -x step20
#sql alter table $mt rename tag tgcol1 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx -x step20
# return -1
#step20:
sql alter table $mt change tag tgcol1 tgcol3
sql alter table $mt change tag tgcol2 tgcol4
sql alter table $mt change tag tgcol4 tgcol3 -x step23
sql alter table $mt rename tag tgcol1 tgcol3
sql alter table $mt rename tag tgcol2 tgcol4
sql alter table $mt rename tag tgcol4 tgcol3 -x step23
return -1
step23:
@ -77,8 +77,8 @@ if $data03 != 2 then
return -1
endi
sql alter table $mt change tag tgcol1 tgcol3
sql alter table $mt change tag tgcol2 tgcol4
sql alter table $mt rename tag tgcol1 tgcol3
sql alter table $mt rename tag tgcol2 tgcol4
print =============== step4
$i = 4
@ -101,8 +101,8 @@ if $data03 != 2.00000 then
return -1
endi
sql alter table $mt change tag tgcol1 tgcol3
sql alter table $mt change tag tgcol2 tgcol4
sql alter table $mt rename tag tgcol1 tgcol3
sql alter table $mt rename tag tgcol2 tgcol4
print =============== step5
$i = 5
@ -125,8 +125,8 @@ if $data03 != 2 then
return -1
endi
sql alter table $mt change tag tgcol1 tgcol3
sql alter table $mt change tag tgcol2 tgcol4
sql alter table $mt rename tag tgcol1 tgcol3
sql alter table $mt rename tag tgcol2 tgcol4
print =============== step6
$i = 6
@ -163,15 +163,14 @@ endi
sql alter table $mt drop tag tgcol3
sql reset query cache
sql alter table $mt change tag tgcol4 tgcol3
sql alter table $mt change tag tgcol1 tgcol7
sql alter table $mt change tag tgcol2 tgcol8
sql alter table $mt rename tag tgcol4 tgcol3
sql alter table $mt rename tag tgcol1 tgcol7
sql alter table $mt rename tag tgcol2 tgcol8
sql reset query cache
sql alter table $mt change tag tgcol3 tgcol9
sql alter table $mt change tag tgcol5 tgcol10
sql alter table $mt change tag tgcol6 tgcol11
sql alter table $mt rename tag tgcol3 tgcol9
sql alter table $mt rename tag tgcol5 tgcol10
sql alter table $mt rename tag tgcol6 tgcol11
sleep 3000
sql reset query cache
print =============== step2

View File

@ -21,12 +21,11 @@ sql create table tb4 using st1 tags(4);
sql insert into tb4 select * from tb1;
goto _OVER
sql select * from tb4;
if $rows != 2 then
return -1
endi
sql insert into tb4 select ts,f1,f2 from st1;
sql select * from tb4;
if $rows != 6 then
@ -59,4 +58,4 @@ endi
if $system_content == $null then
return -1
endi
endi

View File

@ -104,7 +104,7 @@ if $rows != 1 then
endi
sql select * from information_schema.user_tables
if $rows != 30 then
if $rows != 31 then
return -1
endi
@ -152,7 +152,7 @@ endi
system_content sh/checkValgrind.sh -n dnode2
print cmd return result ----> [ $system_content ]
if $system_content > 2 then
if $system_content > 4 then
return -1
endi

View File

@ -60,8 +60,8 @@ sql select c1, c2, c3 from ct1
sql select ts, c1, c2, c3 from stb
sql select * from ct1 where ts < now -1d and ts > now +1d
sql select * from stb where ts < now -1d and ts > now +1d
#sql select * from ct1 where ts < now -1d and ts > now +1d order by ts desc
#sql select * from stb where ts < now -1d and ts > now +1d order by ts desc
sql select * from ct1 where ts < now -1d and ts > now +1d order by ts desc
sql select * from stb where ts < now -1d and ts > now +1d order by ts desc
print =============== step7: count
sql select count(*) from ct1;

View File

@ -57,7 +57,6 @@ sql drop table d1.stb2
sql drop database d1
sql drop database d2
goto _OVER
print =============== step7: repeat
sql create database d1 vgroups 2 buffer 3
sql create database d2 vgroups 2 buffer 3

View File

@ -0,0 +1,82 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sql connect
print =============== step1: create drop show dnodes
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ---> dnode not ready!
return -1
endi
sql show dnodes
print ---> $data00 $data01 $data02 $data03 $data04 $data05
if $rows != 1 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
print =============== step2: create db
sql create database db
sql use db
sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd"
print =============== step3: alter stb
sql_error alter table db.stb add column ts int
sql alter table db.stb add column c3 int
sql alter table db.stb add column c4 bigint
sql alter table db.stb add column c5 binary(12)
sql alter table db.stb drop column c1
sql alter table db.stb drop column c4
sql alter table db.stb MODIFY column c2 binary(32)
sql alter table db.stb add tag t4 bigint
sql alter table db.stb add tag c1 int
sql alter table db.stb add tag t5 binary(12)
sql alter table db.stb drop tag c1
sql alter table db.stb drop tag t5
sql alter table db.stb MODIFY tag t3 binary(32)
sql alter table db.stb rename tag t1 tx
sql alter table db.stb comment 'abcde' ;
goto _OVER
print =============== step4: alter tb
sql create table tb (ts timestamp, a int)
sql insert into tb values(now-28d, -28)
sql select count(a) from tb
sql alter table tb add column b smallint
sql insert into tb values(now-25d, -25, 0)
sql select count(b) from tb
sql alter table tb add column c tinyint
sql insert into tb values(now-22d, -22, 3, 0)
sql select count(c) from tb
sql alter table tb add column d int
sql insert into tb values(now-19d, -19, 6, 0, 0)
sql select count(d) from tb
sql alter table tb add column e bigint
sql alter table tb add column f float
sql alter table tb add column g double
sql alter table tb add column h binary(10)
sql select count(a), count(b), count(c), count(d), count(e), count(f), count(g), count(h) from tb
sql select * from tb order by ts desc
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check
$null=
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content > 0 then
return -1
endi
if $system_content == $null then
return -1
endi

View File

@ -0,0 +1,77 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sql connect
print =============== step1: create drop show dnodes
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ---> dnode not ready!
return -1
endi
sql show dnodes
print ---> $data00 $data01 $data02 $data03 $data04 $data05
if $rows != 1 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
$tbPrefix = tb
$tbNum = 5
$rowNum = 10
print =============== step2: prepare data
sql create database db vgroups 2
sql use db
sql create table if not exists stb (ts timestamp, tbcol int, tbcol2 float, tbcol3 double) tags (tgcol int unsigned)
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using stb tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x , $x , $x )
$x = $x + 1
endw
$i = $i + 1
endw
print =============== step3: avg
sql select avg(tbcol) from tb1
sql select avg(tbcol) from tb1 where ts <= 1601481840000
sql select avg(tbcol) as b from tb1
sql select avg(tbcol) as b from tb1 interval(1d)
goto _OVER
sql select avg(tbcol) as b from tb1 where ts <= 1601481840000s interval(1m)
sql select avg(tbcol) as c from stb
sql select avg(tbcol) as c from stb where ts <= 1601481840000
sql select avg(tbcol) as c from stb where tgcol < 5 and ts <= 1601481840000
sql select avg(tbcol) as c from stb interval(1m)
sql select avg(tbcol) as c from stb interval(1d)
sql select avg(tbcol) as b from stb where ts <= 1601481840000s interval(1m)
sql select avg(tbcol) as c from stb group by tgcol
sql select avg(tbcol) as b from stb where ts <= 1601481840000s partition by tgcol interval(1m)
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check
$null=
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content > 0 then
return -1
endi
if $system_content == $null then
return -1
endi

View File

@ -43,13 +43,48 @@ class TDTestCase:
tdSql.execute('create database db vgroups 1')
tdSql.execute('use db')
print("============== STEP 1 ===== prepare data & validate json string")
i = 0
# add 100000 table
tdSql.execute("create table if not exists jsons1(ts timestamp, dataInt int, dataBool bool, dataStr nchar(50), dataStrBin binary(150)) tags(jtag json)")
while i <= 10 0000:
sql = """insert into jsons1_{%d} using jsons1 tags('{"tag1":{%d}}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')"""%(i, i)
tdSql.execute(sql)
i = i + 1
// do query
i = 0
while i <= 10 0000:
sql = """select count(*) from jsons1 where jtag->'tag1' = %d"""%(i)
tdSql.query(sql)
if 1 != tdSql.getRows():
print("err: %s"%(sql))
while i <= 10000000
sql = """insert into jsons1_{%d} using jsons1 tags('{"tag1":{%d}}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')"""%(i, i)
tdSql.execute(sql)
i = i + 1
i = 0
# drop super table
tdSql.execute("create table if not exists jsons1(ts timestamp, dataInt int, dataBool bool, dataStr nchar(50), dataStrBin binary(150)) tags(jtag json)")
while i <= 100000:
sql = """insert into jsons1_{%d} using jsons1 tags('{"tag1":{%d}}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')"""%(i, i)
tdSql.execute(sql)
i = i + 1
tdSql.execute('drop stable jsons1')
# drop database
i = 0
tdSql.execute("create table if not exists jsons1(ts timestamp, dataInt int, dataBool bool, dataStr nchar(50), dataStrBin binary(150)) tags(jtag json)")
while i <= 100000:
f = "insert into jsons1_{} using jsons1 tags('{\"tag1\":\"fff\",\"tag2\":{}, \"tag3\":true}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')".format
sql = f(i, i)
sql = """insert into jsons1_{%d} using jsons1 tags('{"tag1":{%d}}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')"""%(i, i)
tdSql.execute(sql)
i = i + 1
tdSql.execute('drop database db')
# test duplicate key using the first one. elimate empty key
#tdSql.execute("CREATE TABLE if not exists jsons1_8 using jsons1 tags('{\"tag1\":null, \"tag1\":true, \"tag1\":45, \"1tag$\":2, \" \":90, \"\":32}')") tdSql.query("select jtag from jsons1_8") tdSql.checkRows(0);

View File

@ -426,7 +426,7 @@ class TDTestCase:
tdSql.query(" select unique(t1+c1) from stb1 ")
tdSql.checkRows(13)
tdSql.query(" select unique(t1+c1) from stb1 partition by tbname ")
tdSql.checkRows(13)
tdSql.checkRows(20)
tdSql.query(" select unique(t1) from stb1 partition by tbname ")
tdSql.checkRows(2)

@ -1 +1 @@
Subproject commit 7a94ffab45f08e16f09b3f430fe75d717054adb6
Subproject commit c5fded266d3b10508e38bf3285bb7ecf798bc343