Merge branch '3.0' into feat/agg_client_api

This commit is contained in:
Ganlin Zhao 2022-07-18 14:51:59 +08:00
commit 46a2d9bad4
16 changed files with 123 additions and 53 deletions

View File

@ -268,26 +268,6 @@ typedef struct SSortExecInfo {
int32_t readBytes; // read io bytes int32_t readBytes; // read io bytes
} SSortExecInfo; } SSortExecInfo;
//======================================================================================================================
// for grant
typedef enum {
TSDB_GRANT_ALL,
TSDB_GRANT_TIME,
TSDB_GRANT_USER,
TSDB_GRANT_DB,
TSDB_GRANT_TIMESERIES,
TSDB_GRANT_DNODE,
TSDB_GRANT_ACCT,
TSDB_GRANT_STORAGE,
TSDB_GRANT_SPEED,
TSDB_GRANT_QUERY_TIME,
TSDB_GRANT_CONNS,
TSDB_GRANT_STREAMS,
TSDB_GRANT_CPU_CORES,
} EGrantType;
int32_t grantCheck(EGrantType grant);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

47
include/common/tgrant.h Normal file
View File

@ -0,0 +1,47 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_COMMON_GRANT_H_
#define _TD_COMMON_GRANT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
typedef enum {
TSDB_GRANT_ALL,
TSDB_GRANT_TIME,
TSDB_GRANT_USER,
TSDB_GRANT_DB,
TSDB_GRANT_TIMESERIES,
TSDB_GRANT_DNODE,
TSDB_GRANT_ACCT,
TSDB_GRANT_STORAGE,
TSDB_GRANT_SPEED,
TSDB_GRANT_QUERY_TIME,
TSDB_GRANT_CONNS,
TSDB_GRANT_STREAMS,
TSDB_GRANT_CPU_CORES,
} EGrantType;
int32_t grantCheck(EGrantType grant);
#ifdef __cplusplus
}
#endif
#endif /*_TD_COMMON_GRANT_H_*/

View File

@ -153,9 +153,15 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
switch (qtype) { switch (qtype) {
case QUERY_QUEUE: case QUERY_QUEUE:
vnodePreprocessQueryMsg(pVnode->pImpl, pMsg); if ((pMsg->msgType == TDMT_SCH_QUERY) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) {
dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg); terrno = TSDB_CODE_GRANT_EXPIRED;
taosWriteQitem(pVnode->pQueryQ, pMsg); code = terrno;
dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr());
} else {
vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pQueryQ, pMsg);
}
break; break;
case STREAM_QUEUE: case STREAM_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
@ -166,9 +172,14 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
taosWriteQitem(pVnode->pFetchQ, pMsg); taosWriteQitem(pVnode->pFetchQ, pMsg);
break; break;
case WRITE_QUEUE: case WRITE_QUEUE:
if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg); terrno = TSDB_CODE_VND_NO_WRITE_AUTH;
taosWriteQitem(pVnode->pWriteQ, pMsg); code = terrno;
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr());
} else {
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pWriteQ, pMsg);
}
break; break;
case SYNC_QUEUE: case SYNC_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);

View File

@ -24,6 +24,7 @@
#include "tcache.h" #include "tcache.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
#include "tgrant.h"
#include "tqueue.h" #include "tqueue.h"
#include "ttime.h" #include "ttime.h"
#include "version.h" #include "version.h"

View File

@ -509,11 +509,10 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
SUserObj *pUser = NULL; SUserObj *pUser = NULL;
SCreateDbReq createReq = {0}; SCreateDbReq createReq = {0};
// code = grantCheck(TSDB_GRANT_DB); if ((terrno = grantCheck(TSDB_GRANT_DB)) != 0) {
// if (code != 0) { code = terrno;
// terrno = code; goto _OVER;
// goto _OVER; }
// }
if (tDeserializeSCreateDbReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSCreateDbReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;

View File

@ -621,11 +621,10 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
SCreateDnodeReq createReq = {0}; SCreateDnodeReq createReq = {0};
// code = grantCheck(TSDB_GRANT_DNODE); if ((terrno = grantCheck(TSDB_GRANT_DNODE)) != 0) {
// if (code != TSDB_CODE_SUCCESS) { code = terrno;
// terrno = code; goto _OVER;
// goto _OVER; }
// }
if (tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;

View File

@ -363,11 +363,10 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
// code = grantCheck(TSDB_GRANT_USER); if ((terrno = grantCheck(TSDB_GRANT_USER)) != 0) {
// if (code != TSDB_CODE_SUCCESS) { code = terrno;
// terrno = code; goto _OVER;
// goto _OVER; }
// }
code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq); code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;

View File

@ -27,6 +27,7 @@
#include "wal.h" #include "wal.h"
#include "tcommon.h" #include "tcommon.h"
#include "tgrant.h"
#include "tfs.h" #include "tfs.h"
#include "tmsg.h" #include "tmsg.h"
#include "trow.h" #include "trow.h"

View File

@ -205,7 +205,7 @@ _query:
} }
tDecoderInit(&dc, pData, nData); tDecoderInit(&dc, pData, nData);
tDecodeSSchemaWrapper(&dc, &schema); tDecodeSSchemaWrapperEx(&dc, &schema);
pSchema = tCloneSSchemaWrapper(&schema); pSchema = tCloneSSchemaWrapper(&schema);
tDecoderClear(&dc); tDecoderClear(&dc);
@ -470,9 +470,9 @@ int64_t metaGetTbNum(SMeta *pMeta) {
} }
// N.B. Called by statusReq per second // N.B. Called by statusReq per second
int64_t metaGetTimeSeriesNum(SMeta *pMeta) { int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
// TODO // TODO
return 400; return 400;
} }
typedef struct { typedef struct {

View File

@ -64,9 +64,16 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
qTaskInfo_t task = pExec->execCol.task[0]; qTaskInfo_t task = pExec->execCol.task[0];
if (qStreamPrepareScan(task, pOffset) < 0) { if (qStreamPrepareScan(task, pOffset) < 0) {
ASSERT(pOffset->type == TMQ_OFFSET__LOG); if (pOffset->type == TMQ_OFFSET__LOG) {
pRsp->rspOffset = *pOffset; pRsp->rspOffset = *pOffset;
return 0; return 0;
} else {
tqOffsetResetToLog(pOffset, pHandle->snapshotVer + 1);
if (qStreamPrepareScan(task, pOffset) < 0) {
pRsp->rspOffset = *pOffset;
return 0;
}
}
} }
int32_t rowCnt = 0; int32_t rowCnt = 0;

View File

@ -630,6 +630,9 @@ _exit:
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen); tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp); tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
tEncoderClear(&ec); tEncoderClear(&ec);
if (vMetaRsp.pSchemas) {
taosMemoryFree(vMetaRsp.pSchemas);
}
return 0; return 0;
} }

View File

@ -96,9 +96,9 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
} }
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order); void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order);
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo); bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);

View File

@ -72,8 +72,12 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) { void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
assert(pGroupResInfo != NULL); assert(pGroupResInfo != NULL);
taosArrayDestroy(pGroupResInfo->pRows); for(int32_t i = 0; i < taosArrayGetSize(pGroupResInfo->pRows); ++i) {
pGroupResInfo->pRows = NULL; SResKeyPos* pRes = taosArrayGetP(pGroupResInfo->pRows, i);
taosMemoryFree(pRes);
}
pGroupResInfo->pRows = taosArrayDestroy(pGroupResInfo->pRows);
pGroupResInfo->index = 0; pGroupResInfo->index = 0;
} }

View File

@ -332,6 +332,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
uid = pTableInfo->uid; uid = pTableInfo->uid;
ts = INT64_MIN; ts = INT64_MIN;
} else {
return -1;
} }
} }
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/

View File

@ -377,6 +377,10 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
} }
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) {
colDataDestroy(pColData);
}
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
int32_t numOfTotal, int32_t numOfOutput, int32_t order) { int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
@ -3737,7 +3741,7 @@ void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
taosArrayDestroyEx(pInfo->groupResInfo.pRows, freeItem); cleanupGroupResInfo(&pInfo->groupResInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }

View File

@ -1513,12 +1513,25 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
static void freeItem(void* param) {
SGroupKeys *pKey = (SGroupKeys*) param;
taosMemoryFree(pKey->pData);
}
void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) { void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param; SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
taosArrayDestroy(pInfo->pRecycledPages); pInfo->pRecycledPages = taosArrayDestroy(pInfo->pRecycledPages);
pInfo->pInterpCols = taosArrayDestroy(pInfo->pInterpCols);
taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
pInfo->pPrevValues = NULL;
pInfo->pDelWins = taosArrayDestroy(pInfo->pDelWins);
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes);
cleanupGroupResInfo(&pInfo->groupResInfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }