Merge branch '3.0' into feature/3_liaohj

This commit is contained in:
Haojun Liao 2022-07-18 10:01:32 +08:00 committed by GitHub
commit 95db2fb8a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 397 additions and 179 deletions

View File

@ -268,6 +268,26 @@ typedef struct SSortExecInfo {
int32_t readBytes; // read io bytes
} SSortExecInfo;
//======================================================================================================================
// for grant
typedef enum {
TSDB_GRANT_ALL,
TSDB_GRANT_TIME,
TSDB_GRANT_USER,
TSDB_GRANT_DB,
TSDB_GRANT_TIMESERIES,
TSDB_GRANT_DNODE,
TSDB_GRANT_ACCT,
TSDB_GRANT_STORAGE,
TSDB_GRANT_SPEED,
TSDB_GRANT_QUERY_TIME,
TSDB_GRANT_CONNS,
TSDB_GRANT_STREAMS,
TSDB_GRANT_CPU_CORES,
} EGrantType;
int32_t grantCheck(EGrantType grant);
#ifdef __cplusplus
}
#endif

View File

@ -139,6 +139,7 @@ extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval;
extern int32_t tsTtlUnit;
extern int32_t tsTtlPushInterval;
extern int32_t tsGrantHBInterval;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)

View File

@ -152,6 +152,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_HB_TIMER, "grant-hb-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL)

View File

@ -172,13 +172,8 @@ typedef struct tExprNode {
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));
typedef enum {
SHOULD_FREE_COLDATA = 0x1, // the newly created column data needs to be destroyed.
DELEGATED_MGMT_COLDATA = 0x2, // input column data should not be released.
} ECOLDATA_MGMT_TYPE_E;
struct SScalarParam {
ECOLDATA_MGMT_TYPE_E type;
bool colAlloced;
SColumnInfoData *columnData;
SHashObj *pHashFilter;
int32_t hashValueType;

View File

@ -114,6 +114,7 @@ int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
#ifdef __cplusplus
}

View File

@ -184,6 +184,7 @@ int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 60;
int32_t tsGrantHBInterval = 60;
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);

View File

@ -46,6 +46,7 @@ int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantReq(SRpcMsg *pMsg);
// dmWorker.c
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -331,7 +331,8 @@ SArray *dmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
// Requests handled by MNODE
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
// if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
code = 0;

View File

@ -144,6 +144,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_DND_SYSTABLE_RETRIEVE:
code = dmProcessRetrieve(pMgmt, pMsg);
break;
case TDMT_MND_GRANT:
code = dmProcessGrantReq(pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
break;

View File

@ -206,7 +206,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
// if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -166,6 +166,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
taosWriteQitem(pVnode->pFetchQ, pMsg);
break;
case WRITE_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pWriteQ, pMsg);
break;

View File

@ -26,6 +26,7 @@ int32_t mndInitCluster(SMnode *pMnode);
void mndCleanupCluster(SMnode *pMnode);
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
int64_t mndGetClusterId(SMnode *pMnode);
int64_t mndGetClusterCreateTime(SMnode *pMnode);
#ifdef __cplusplus
}

View File

@ -29,6 +29,7 @@ void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode);
int32_t mndGetDnodeSize(SMnode *pMnode);
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs);
void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps);
#ifdef __cplusplus
}

View File

@ -22,27 +22,10 @@
#include "mndInt.h"
typedef enum {
TSDB_GRANT_ALL,
TSDB_GRANT_TIME,
TSDB_GRANT_USER,
TSDB_GRANT_DB,
TSDB_GRANT_TIMESERIES,
TSDB_GRANT_DNODE,
TSDB_GRANT_ACCT,
TSDB_GRANT_STORAGE,
TSDB_GRANT_SPEED,
TSDB_GRANT_QUERY_TIME,
TSDB_GRANT_CONNS,
TSDB_GRANT_STREAMS,
TSDB_GRANT_CPU_CORES,
} EGrantType;
int32_t mndInitGrant();
int32_t mndInitGrant(SMnode *pMnode);
void mndCleanupGrant();
void grantParseParameter();
int32_t grantCheck(EGrantType grant);
void grantReset(EGrantType grant, uint64_t value);
void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value);
void grantAdd(EGrantType grant, uint64_t value);
void grantRestore(EGrantType grant, uint64_t value);

View File

@ -79,6 +79,23 @@ int64_t mndGetClusterId(SMnode *pMnode) {
return clusterId;
}
int64_t mndGetClusterCreateTime(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
int64_t createTime = INT64_MAX;
while (1) {
SClusterObj *pCluster = NULL;
pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster);
if (pIter == NULL) break;
createTime = pCluster->createdTime;
sdbRelease(pSdb, pCluster);
}
return createTime;
}
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
terrno = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -509,6 +509,12 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
SUserObj *pUser = NULL;
SCreateDbReq createReq = {0};
// code = grantCheck(TSDB_GRANT_DB);
// if (code != 0) {
// terrno = code;
// goto _OVER;
// }
if (tDeserializeSCreateDbReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;

View File

@ -262,7 +262,7 @@ bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
return true;
}
static void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
SSdb *pSdb = pMnode->pSdb;
int32_t numOfEps = 0;
@ -621,6 +621,12 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
SDnodeObj *pDnode = NULL;
SCreateDnodeReq createReq = {0};
// code = grantCheck(TSDB_GRANT_DNODE);
// if (code != TSDB_CODE_SUCCESS) {
// terrno = code;
// goto _OVER;
// }
if (tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;

View File

@ -118,17 +118,21 @@ static int32_t mndRetrieveGrant(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
return numOfRows;
}
static int32_t mndProcessGrantHB(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; }
int32_t mndInitGrant(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_GRANTS, mndRetrieveGrant);
mndSetMsgHandle(pMnode, TDMT_MND_GRANT_HB_TIMER, mndProcessGrantHB);
return 0;
}
void mndCleanupGrant() {}
void grantParseParameter() { mError("can't parsed parameter k"); }
int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; }
void grantReset(EGrantType grant, uint64_t value) {}
void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value) {}
void grantAdd(EGrantType grant, uint64_t value) {}
void grantRestore(EGrantType grant, uint64_t value) {}
int32_t dmProcessGrantReq(SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
#endif

View File

@ -90,6 +90,16 @@ static void mndPullupTelem(SMnode *pMnode) {
}
}
static void mndGrantHeartBeat(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
static void *mndThreadFp(void *param) {
SMnode *pMnode = param;
int64_t lastTime = 0;
@ -115,6 +125,10 @@ static void *mndThreadFp(void *param) {
if (lastTime % (tsTelemInterval * 10) == 0) {
mndPullupTelem(pMnode);
}
if (lastTime % (tsGrantHBInterval * 10) == 0) {
mndGrantHeartBeat(pMnode);
}
}
return NULL;
@ -402,6 +416,9 @@ int32_t mndStart(SMnode *pMnode) {
}
mndSetRestore(pMnode, true);
}
grantReset(pMnode, TSDB_GRANT_ALL, 0);
return mndInitTimer(pMnode);
}

View File

@ -363,6 +363,12 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
goto _OVER;
}
// code = grantCheck(TSDB_GRANT_USER);
// if (code != TSDB_CODE_SUCCESS) {
// terrno = code;
// goto _OVER;
// }
code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;

View File

@ -99,7 +99,8 @@ STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
tb_uid_t metaGetTableEntryUidByName(SMeta* pMeta, const char* name);
int metaGetTbNum(SMeta* pMeta);
int64_t metaGetTbNum(SMeta* pMeta);
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);

View File

@ -463,12 +463,18 @@ _err:
return code;
}
int metaGetTbNum(SMeta *pMeta) {
// N.B. Called by statusReq per second
int64_t metaGetTbNum(SMeta *pMeta) {
// TODO
// ASSERT(0);
return 0;
}
// N.B. Called by statusReq per second
int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
// TODO
return 400;
}
typedef struct {
SMeta *pMeta;
TBC *pCur;

View File

@ -3186,6 +3186,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
*suid = 0;
if (mr.me.type == TSDB_CHILD_TABLE) {
tDecoderClear(&mr.coder);
*suid = mr.me.ctbEntry.suid;
code = metaGetTableEntryByUid(&mr, *suid);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -239,9 +239,9 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->vgId = TD_VID(pVnode);
pLoad->syncState = syncGetMyRole(pVnode->sync);
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
pLoad->numOfTimeSeries = 400;
pLoad->totalStorage = 300;
pLoad->compStorage = 200;
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
pLoad->totalStorage = (int64_t)3 * 1073741824;
pLoad->compStorage = (int64_t)2 * 1073741824;
pLoad->pointsWritten = 100;
pLoad->numOfSelectReqs = 1;
pLoad->numOfInsertReqs = 3;

View File

@ -175,6 +175,7 @@ _err:
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
int32_t code = 0;
SVnode *pVnode = pWriter->pVnode;
if (pWriter->pMetaSnapWriter) {
code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback);
@ -186,8 +187,31 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
if (code) goto _err;
}
if (!rollback) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN];
pVnode->state.committed = pWriter->ever;
pVnode->state.applied = pWriter->ever;
// pVnode->state.applyTerm = ;
// pVnode->state.commitTerm = ;
info.config = pVnode->config;
info.state.committed = pVnode->state.applied;
info.state.commitTerm = pVnode->state.applyTerm;
info.state.commitID = pVnode->state.commitID;
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
code = vnodeSaveInfo(dir, &info);
if (code) goto _err;
code = vnodeCommitInfo(dir, &info);
if (code) goto _err;
} else {
ASSERT(0);
}
_exit:
vInfo("vgId:%d vnode snapshot writer closed, rollback:%d", TD_VID(pWriter->pVnode), rollback);
vInfo("vgId:%d vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback);
taosMemoryFree(pWriter);
return code;

View File

@ -272,7 +272,7 @@ void ctgFreeHandle(SCatalog* pCtg) {
taosMemoryFree(pCtg);
ctgInfo("handle freed, culsterId:0x%" PRIx64, clusterId);
ctgInfo("handle freed, clusterId:0x%" PRIx64, clusterId);
}
void ctgClearHandle(SCatalog* pCtg) {
@ -303,7 +303,7 @@ void ctgClearHandle(SCatalog* pCtg) {
CTG_CACHE_STAT_INC(numOfClear, 1);
ctgInfo("handle cleared, culsterId:0x%" PRIx64, clusterId);
ctgInfo("handle cleared, clusterId:0x%" PRIx64, clusterId);
}
void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) {

View File

@ -90,6 +90,7 @@ _return:
tsem_post(&pInserter->ready);
taosMemoryFree(pMsg->pData);
taosMemoryFree(param);
return TSDB_CODE_SUCCESS;
@ -283,6 +284,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
taosArrayDestroy(pInserter->pDataBlocks);
taosMemoryFree(pInserter->pSchema);
taosMemoryFree(pInserter->pParam);
taosHashCleanup(pInserter->pCols);
taosThreadMutexDestroy(&pInserter->mutex);
return TSDB_CODE_SUCCESS;
}

View File

@ -629,6 +629,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
ASSERT(pResult->info.capacity > 0);
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
colDataDestroy(&idata);
numOfRows = dest.numOfRows;
taosArrayDestroy(pBlockList);
@ -684,6 +685,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
ASSERT(pResult->info.capacity > 0);
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
colDataDestroy(&idata);
numOfRows = dest.numOfRows;
taosArrayDestroy(pBlockList);

View File

@ -1802,83 +1802,6 @@ static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) {
return pBlock;
}
// TODO: check more datatype, json? and return detailed error when len is not enough
static int32_t convertTagDataToTagVarchar(int8_t tagType, char* tagVal, uint32_t tagLen, char* varData,
int32_t bufSize) {
int outputLen = -1;
switch (tagType) {
case TSDB_DATA_TYPE_TINYINT:
outputLen = snprintf(varDataVal(varData), bufSize, "%d", *((int8_t*)tagVal));
break;
case TSDB_DATA_TYPE_UTINYINT:
outputLen = snprintf(varDataVal(varData), bufSize, "%u", *((uint8_t*)tagVal));
break;
case TSDB_DATA_TYPE_SMALLINT:
outputLen = snprintf(varDataVal(varData), bufSize, "%d", *((int16_t*)tagVal));
break;
case TSDB_DATA_TYPE_USMALLINT:
outputLen = snprintf(varDataVal(varData), bufSize, "%u", *((uint16_t*)tagVal));
break;
case TSDB_DATA_TYPE_INT:
outputLen = snprintf(varDataVal(varData), bufSize, "%d", *((int32_t*)tagVal));
break;
case TSDB_DATA_TYPE_UINT:
outputLen = snprintf(varDataVal(varData), bufSize, "%u", *((uint32_t*)tagVal));
break;
case TSDB_DATA_TYPE_BIGINT:
outputLen = snprintf(varDataVal(varData), bufSize, "%" PRId64, *((int64_t*)tagVal));
break;
case TSDB_DATA_TYPE_UBIGINT:
outputLen = snprintf(varDataVal(varData), bufSize, "%" PRIu64, *((uint64_t*)tagVal));
break;
case TSDB_DATA_TYPE_FLOAT: {
float fv = 0;
fv = GET_FLOAT_VAL(tagVal);
outputLen = snprintf(varDataVal(varData), bufSize, "%f", fv);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0;
dv = GET_DOUBLE_VAL(tagVal);
outputLen = snprintf(varDataVal(varData), bufSize, "%lf", dv);
break;
}
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON: {
memcpy(varDataVal(varData), tagVal, tagLen);
outputLen = tagLen;
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
outputLen = snprintf(varDataVal(varData), bufSize, "%" PRId64, *((int64_t*)tagVal));
break;
case TSDB_DATA_TYPE_BOOL:
outputLen = snprintf(varDataVal(varData), bufSize, "%d", *((int8_t*)tagVal));
break;
default:
return TSDB_CODE_FAILED;
}
if (outputLen < 0 || outputLen == bufSize && !IS_VAR_DATA_TYPE(tagType) || outputLen > bufSize) {
return TSDB_CODE_FAILED;
}
varDataSetLen(varData, outputLen);
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
@ -1962,10 +1885,9 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
tagVal.cid = smr.me.stbEntry.schemaTag.pSchema[i].colId;
char* tagData = NULL;
uint32_t tagLen = 0;
if (tagType == TSDB_DATA_TYPE_JSON) {
// TODO: json type?+varheader+data
tagData = varDataVal(pInfo->pCur->mr.me.ctbEntry.pTags + 1);
tagLen = varDataLen(pInfo->pCur->mr.me.ctbEntry.pTags + 1);
tagData = (char*)pInfo->pCur->mr.me.ctbEntry.pTags;
} else {
bool exist = tTagGet((STag*)pInfo->pCur->mr.me.ctbEntry.pTags, &tagVal);
if (exist) {
@ -1979,27 +1901,27 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
}
}
int32_t bufSize = IS_VAR_DATA_TYPE(tagType) ? (tagLen + VARSTR_HEADER_SIZE)
: (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE);
char* tagVarChar = NULL;
if (tagData != NULL) {
if (tagType == TSDB_DATA_TYPE_JSON) {
char* tagJson = parseTagDatatoJson(tagData);
tagVarChar = taosMemoryMalloc(strlen(tagJson) + VARSTR_HEADER_SIZE);
memcpy(varDataVal(tagVarChar), tagJson, strlen(tagJson));
varDataSetLen(tagVarChar, strlen(tagJson));
taosMemoryFree(tagJson);
} else {
int32_t bufSize = IS_VAR_DATA_TYPE(tagType) ? (tagLen + VARSTR_HEADER_SIZE)
: (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE);
tagVarChar = taosMemoryMalloc(bufSize);
code = convertTagDataToTagVarchar(tagType, tagData, tagLen, tagVarChar, bufSize);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno),
GET_TASKID(pTaskInfo));
taosMemoryFree(tagVarChar);
metaReaderClear(&smr);
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
longjmp(pTaskInfo->env, terrno);
int32_t len = -1;
dataConverToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len);
varDataSetLen(tagVarChar, len);
}
}
pColInfoData = taosArrayGet(p->pDataBlock, 5);
colDataAppend(pColInfoData, numOfRows, tagVarChar,
(tagData == NULL) || (tagType == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(tagData)));
taosMemoryFree(tagVarChar);
++numOfRows;
}
metaReaderClear(&smr);

View File

@ -1132,9 +1132,9 @@ static bool validateStateOper(const SValueNode* pVal) {
if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) {
return false;
}
return (0 == strcasecmp(varDataVal(pVal->datum.p), "GT") || 0 == strcasecmp(varDataVal(pVal->datum.p), "GE") ||
0 == strcasecmp(varDataVal(pVal->datum.p), "LT") || 0 == strcasecmp(varDataVal(pVal->datum.p), "LE") ||
0 == strcasecmp(varDataVal(pVal->datum.p), "EQ") || 0 == strcasecmp(varDataVal(pVal->datum.p), "NE"));
return (0 == strncasecmp(varDataVal(pVal->datum.p), "GT", 2) || 0 == strncasecmp(varDataVal(pVal->datum.p), "GE", 2) ||
0 == strncasecmp(varDataVal(pVal->datum.p), "LT", 2) || 0 == strncasecmp(varDataVal(pVal->datum.p), "LE", 2) ||
0 == strncasecmp(varDataVal(pVal->datum.p), "EQ", 2) || 0 == strncasecmp(varDataVal(pVal->datum.p), "NE", 2));
}
static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
@ -2419,6 +2419,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getStateFuncEnv,
.initFunc = functionSetup,
.processFunc = stateCountFunction,
.sprocessFunc = stateCountScalarFunction,
.finalizeFunc = NULL
},
{

View File

@ -4469,17 +4469,17 @@ bool getStateFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
static int8_t getStateOpType(char* opStr) {
int8_t opType;
if (strcasecmp(opStr, "LT") == 0) {
if (strncasecmp(opStr, "LT", 2) == 0) {
opType = STATE_OPER_LT;
} else if (strcasecmp(opStr, "GT") == 0) {
} else if (strncasecmp(opStr, "GT", 2) == 0) {
opType = STATE_OPER_GT;
} else if (strcasecmp(opStr, "LE") == 0) {
} else if (strncasecmp(opStr, "LE", 2) == 0) {
opType = STATE_OPER_LE;
} else if (strcasecmp(opStr, "GE") == 0) {
} else if (strncasecmp(opStr, "GE", 2) == 0) {
opType = STATE_OPER_GE;
} else if (strcasecmp(opStr, "NE") == 0) {
} else if (strncasecmp(opStr, "NE", 2) == 0) {
opType = STATE_OPER_NE;
} else if (strcasecmp(opStr, "EQ") == 0) {
} else if (strncasecmp(opStr, "EQ", 2) == 0) {
opType = STATE_OPER_EQ;
} else {
opType = STATE_OPER_INVALID;

View File

@ -855,6 +855,7 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
memcpy(output->columnData,
taosArrayGet(input->pDataBlock, 0),
sizeof(SColumnInfoData));
output->colAlloced = true;
return 0;
}

View File

@ -19,8 +19,8 @@
const static uint32_t STATE_LIMIT = 1000;
static int dfaInstsEqual(const void *a, const void *b, size_t size) {
SArray *ar = (SArray *)a;
SArray *br = (SArray *)b;
SArray *ar = *(SArray **)a;
SArray *br = *(SArray **)b;
size_t al = ar != NULL ? taosArrayGetSize(ar) : 0;
size_t bl = br != NULL ? taosArrayGetSize(br) : 0;
if (al != bl) {
@ -71,8 +71,8 @@ FstDfa *dfaBuilderBuild(FstDfaBuilder *builder) {
dfaAdd(builder->dfa, cur, 0);
SArray *states = taosArrayInit(0, sizeof(uint32_t));
uint32_t result;
SArray *states = taosArrayInit(0, sizeof(uint32_t));
if (dfaBuilderCacheState(builder, cur, &result)) {
taosArrayPush(states, &result);
}
@ -146,10 +146,9 @@ bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *r
*result = *v;
taosArrayDestroy(tinsts);
} else {
DfaState st;
st.insts = tinsts;
st.isMatch = isMatch;
DfaState st = {.insts = tinsts, .isMatch = isMatch};
taosArrayPush(builder->dfa->states, &st);
int32_t sz = taosArrayGetSize(builder->dfa->states) - 1;
taosHashPut(builder->cache, &tinsts, sizeof(POINTER_BYTES), &sz, sizeof(sz));
*result = sz;

View File

@ -85,11 +85,12 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
blk->blockId = blkId;
blk->nread = taosPReadFile(ctx->file.pFile, blk->buf, kBlockSize, blkId * kBlockSize);
assert(blk->nread <= kBlockSize);
nread = TMIN(blkLeft, len);
if (blk->nread < kBlockSize && blk->nread < len) {
break;
}
nread = TMIN(blkLeft, len);
memcpy(buf + total, blk->buf + blkOffset, nread);
LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,

View File

@ -78,8 +78,8 @@ bool sparSetContains(FstSparseSet *ss, int32_t ip) {
if (ip >= ss->cap || ip < 0) {
return false;
}
int32_t i = ss->sparse[ip];
int32_t i = ss->sparse[ip];
if (i >= 0 && i < ss->cap && i < ss->size && ss->dense[i] == ip) {
return true;
} else {

View File

@ -952,6 +952,7 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
SQueryInserterNode* pSink = (SQueryInserterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink);
nodesDestroyList(pSink->pCols);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {

View File

@ -55,7 +55,7 @@ int32_t sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows, SScalarPara
}
pParam->columnData = pColumnData;
pParam->type = SHOULD_FREE_COLDATA;
pParam->colAlloced = true;
return TSDB_CODE_SUCCESS;
}
@ -166,6 +166,10 @@ void sclFreeRes(SHashObj *res) {
}
void sclFreeParam(SScalarParam *param) {
if (!param->colAlloced) {
return;
}
if (param->columnData != NULL) {
colDataDestroy(param->columnData);
taosMemoryFreeClear(param->columnData);
@ -173,6 +177,7 @@ void sclFreeParam(SScalarParam *param) {
if (param->pHashFilter != NULL) {
taosHashCleanup(param->pHashFilter);
param->pHashFilter = NULL;
}
}
@ -191,6 +196,19 @@ int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) {
return TSDB_CODE_SUCCESS;
}
void sclFreeParamList(SScalarParam *param, int32_t paramNum) {
if (NULL == param) {
return;
}
for (int32_t i = 0; i < paramNum; ++i) {
SScalarParam* p = param + i;
sclFreeParam(p);
}
taosMemoryFree(param);
}
int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t *rowNum) {
switch (nodeType(node)) {
case QUERY_NODE_LEFT_VALUE: {
@ -225,11 +243,14 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
SCL_ERR_RET(scalarGenerateSetFromList((void **)&param->pHashFilter, node, type));
param->hashValueType = type;
param->colAlloced = true;
if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
taosHashCleanup(param->pHashFilter);
param->pHashFilter = NULL;
sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
param->colAlloced = false;
break;
}
case QUERY_NODE_COLUMN: {
@ -274,6 +295,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
*param = *res;
param->colAlloced = false;
break;
}
default:
@ -455,11 +477,7 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
_return:
for (int32_t i = 0; i < paramNum; ++i) {
// sclFreeParamNoData(params + i);
}
taosMemoryFreeClear(params);
sclFreeParamList(params, paramNum);
SCL_RET(code);
}
@ -533,11 +551,7 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
_return:
for (int32_t i = 0; i < paramNum; ++i) {
// sclFreeParamNoData(params + i);
}
taosMemoryFreeClear(params);
sclFreeParamList(params, paramNum);
SCL_RET(code);
}
@ -573,14 +587,8 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
code = terrno;
_return:
for (int32_t i = 0; i < paramNum; ++i) {
if (params[i].type == SHOULD_FREE_COLDATA) {
colDataDestroy(params[i].columnData);
taosMemoryFreeClear(params[i].columnData);
}
}
taosMemoryFreeClear(params);
sclFreeParamList(params, paramNum);
SCL_RET(code);
}
@ -871,7 +879,6 @@ EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
return DEAL_RES_ERROR;
}
output.type = DELEGATED_MGMT_COLDATA;
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
@ -906,7 +913,6 @@ EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
return DEAL_RES_ERROR;
}
output.type = DELEGATED_MGMT_COLDATA;
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;

View File

@ -2427,3 +2427,153 @@ int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *
int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return sumScalarFunction(pInput, inputNum, pOutput);
}
typedef enum {
STATE_OPER_INVALID = 0,
STATE_OPER_LT,
STATE_OPER_GT,
STATE_OPER_LE,
STATE_OPER_GE,
STATE_OPER_NE,
STATE_OPER_EQ,
} EStateOperType;
#define STATE_COMP(_op, _lval, _rval, _rtype) STATE_COMP_IMPL(_op, _lval, GET_STATE_VAL(_rval, _rtype))
#define GET_STATE_VAL(_val, _type) ((_type == TSDB_DATA_TYPE_BIGINT) ? (*(int64_t *)_val) : (*(double *)_val))
#define STATE_COMP_IMPL(_op, _lval, _rval) \
do { \
switch (_op) { \
case STATE_OPER_LT: \
return ((_lval) < (_rval)); \
break; \
case STATE_OPER_GT: \
return ((_lval) > (_rval)); \
break; \
case STATE_OPER_LE: \
return ((_lval) <= (_rval)); \
break; \
case STATE_OPER_GE: \
return ((_lval) >= (_rval)); \
break; \
case STATE_OPER_NE: \
return ((_lval) != (_rval)); \
break; \
case STATE_OPER_EQ: \
return ((_lval) == (_rval)); \
break; \
default: \
break; \
} \
} while (0)
static int8_t getStateOpType(char* opStr) {
int8_t opType;
if (strncasecmp(opStr, "LT", 2) == 0) {
opType = STATE_OPER_LT;
} else if (strncasecmp(opStr, "GT", 2) == 0) {
opType = STATE_OPER_GT;
} else if (strncasecmp(opStr, "LE", 2) == 0) {
opType = STATE_OPER_LE;
} else if (strncasecmp(opStr, "GE", 2) == 0) {
opType = STATE_OPER_GE;
} else if (strncasecmp(opStr, "NE", 2) == 0) {
opType = STATE_OPER_NE;
} else if (strncasecmp(opStr, "EQ", 2) == 0) {
opType = STATE_OPER_EQ;
} else {
opType = STATE_OPER_INVALID;
}
return opType;
}
static bool checkStateOp(int8_t op, SColumnInfoData* pCol, int32_t index, SScalarParam *pCondParam) {
char* data = colDataGetData(pCol, index);
char* param = pCondParam->columnData->pData;
int32_t paramType = GET_PARAM_TYPE(pCondParam);
switch (pCol->info.type) {
case TSDB_DATA_TYPE_TINYINT: {
int8_t v = *(int8_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
uint8_t v = *(uint8_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t v = *(int16_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
uint16_t v = *(uint16_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t v = *(int32_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_UINT: {
uint32_t v = *(uint32_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t v = *(int64_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
uint64_t v = *(uint64_t*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float v = *(float*)data;
STATE_COMP(op, v, param, paramType);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double v = *(double*)data;
STATE_COMP(op, v, param, paramType);
break;
}
default: {
ASSERT(0);
}
}
return false;
}
int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
int8_t op = getStateOpType(varDataVal(pInput[1].columnData->pData));
int64_t count = 0;
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_s(pInputData, i)) {
colDataAppendNULL(pOutputData, i);
continue;
}
bool ret = checkStateOp(op, pInputData, i, &pInput[2]);
int64_t out = -1;
if (ret) {
out = ++count;
} else {
count = 0;
}
colDataAppend(pOutputData, i, (char*)&out, false);
}
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}

View File

@ -207,7 +207,7 @@ void flttMakeListNode(SNode **pNode, SNodeList *list, int32_t resType) {
void initScalarParam(SScalarParam* pParam) {
memset(pParam, 0, sizeof(SScalarParam));
pParam->type = SHOULD_FREE_COLDATA;
pParam->colAlloced = true;
}
}

View File

@ -21,12 +21,11 @@ sql create table tb4 using st1 tags(4);
sql insert into tb4 select * from tb1;
goto _OVER
sql select * from tb4;
if $rows != 2 then
return -1
endi
sql insert into tb4 select ts,f1,f2 from st1;
sql select * from tb4;
if $rows != 6 then

View File

@ -152,7 +152,7 @@ endi
system_content sh/checkValgrind.sh -n dnode2
print cmd return result ----> [ $system_content ]
if $system_content > 6 then
if $system_content > 4 then
return -1
endi

View File

@ -60,8 +60,8 @@ sql select c1, c2, c3 from ct1
sql select ts, c1, c2, c3 from stb
sql select * from ct1 where ts < now -1d and ts > now +1d
sql select * from stb where ts < now -1d and ts > now +1d
#sql select * from ct1 where ts < now -1d and ts > now +1d order by ts desc
#sql select * from stb where ts < now -1d and ts > now +1d order by ts desc
sql select * from ct1 where ts < now -1d and ts > now +1d order by ts desc
sql select * from stb where ts < now -1d and ts > now +1d order by ts desc
print =============== step7: count
sql select count(*) from ct1;

View File

@ -43,13 +43,48 @@ class TDTestCase:
tdSql.execute('create database db vgroups 1')
tdSql.execute('use db')
print("============== STEP 1 ===== prepare data & validate json string")
i = 0
# add 100000 table
tdSql.execute("create table if not exists jsons1(ts timestamp, dataInt int, dataBool bool, dataStr nchar(50), dataStrBin binary(150)) tags(jtag json)")
while i <= 10 0000:
sql = """insert into jsons1_{%d} using jsons1 tags('{"tag1":{%d}}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')"""%(i, i)
tdSql.execute(sql)
i = i + 1
// do query
i = 0
while i <= 10 0000:
sql = """select count(*) from jsons1 where jtag->'tag1' = %d"""%(i)
tdSql.query(sql)
if 1 != tdSql.getRows():
print("err: %s"%(sql))
while i <= 10000000
sql = """insert into jsons1_{%d} using jsons1 tags('{"tag1":{%d}}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')"""%(i, i)
tdSql.execute(sql)
i = i + 1
i = 0
# drop super table
tdSql.execute("create table if not exists jsons1(ts timestamp, dataInt int, dataBool bool, dataStr nchar(50), dataStrBin binary(150)) tags(jtag json)")
while i <= 100000:
sql = """insert into jsons1_{%d} using jsons1 tags('{"tag1":{%d}}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')"""%(i, i)
tdSql.execute(sql)
i = i + 1
tdSql.execute('drop stable jsons1')
# drop database
i = 0
tdSql.execute("create table if not exists jsons1(ts timestamp, dataInt int, dataBool bool, dataStr nchar(50), dataStrBin binary(150)) tags(jtag json)")
while i <= 100000:
f = "insert into jsons1_{} using jsons1 tags('{\"tag1\":\"fff\",\"tag2\":{}, \"tag3\":true}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')".format
sql = f(i, i)
sql = """insert into jsons1_{%d} using jsons1 tags('{"tag1":{%d}}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')"""%(i, i)
tdSql.execute(sql)
i = i + 1
tdSql.execute('drop database db')
# test duplicate key using the first one. elimate empty key
#tdSql.execute("CREATE TABLE if not exists jsons1_8 using jsons1 tags('{\"tag1\":null, \"tag1\":true, \"tag1\":45, \"1tag$\":2, \" \":90, \"\":32}')") tdSql.query("select jtag from jsons1_8") tdSql.checkRows(0);