enh: batch create table

This commit is contained in:
kailixu 2024-04-28 04:37:30 +08:00
parent 0b5e35643c
commit 504263315d
11 changed files with 91 additions and 45 deletions

View File

@ -178,7 +178,6 @@ extern int32_t tsMetaCacheMaxSize;
extern int32_t tsSlowLogThreshold;
extern int32_t tsSlowLogScope;
extern int32_t tsTimeSeriesThreshold;
extern int64_t tsTimeSeriesInterval;
extern bool tsMultiResultFunctionStarReturnTags;
// client

View File

@ -58,11 +58,12 @@ typedef enum {
TSDB_GRANT_BACKUP_RESTORE,
} EGrantType;
int32_t checkAndGetCryptKey(const char *encryptCode, const char *machineId, char **key);
int32_t generateEncryptCode(const char *key, const char *machineId, char **encryptCode);
int32_t grantCheck(EGrantType grant);
int32_t grantCheckExpire(EGrantType grant);
char *tGetMachineId();
int32_t checkAndGetCryptKey(const char *encryptCode, const char *machineId, char **key);
int32_t generateEncryptCode(const char *key, const char *machineId, char **encryptCode);
uint64_t grantRemain(EGrantType grant);
int32_t grantCheck(EGrantType grant);
int32_t grantCheckExpire(EGrantType grant);
char *tGetMachineId();
// #ifndef GRANTS_CFG
#ifdef TD_ENTERPRISE

View File

@ -181,7 +181,6 @@ int32_t tsMetaCacheMaxSize = -1; // MB
int32_t tsSlowLogThreshold = 3; // seconds
int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL;
int32_t tsTimeSeriesThreshold = 50;
int64_t tsTimeSeriesInterval = 10; // ms
bool tsMultiResultFunctionStarReturnTags = false;
/*
@ -784,8 +783,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0)
return -1;
if (cfgAddInt64(pCfg, "timeseriesInterval", tsTimeSeriesInterval, 1, 100, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
return -1;
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX,
CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
@ -1242,7 +1239,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32;
tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32;
tsTimeSeriesThreshold = cfgGetItem(pCfg, "timeseriesThreshold")->i32;
tsTimeSeriesInterval = cfgGetItem(pCfg, "timeseriesInterval")->i64;
tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64;
@ -1561,7 +1557,6 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"numOfLogLines", &tsNumOfLogLines},
{"queryRspPolicy", &tsQueryRspPolicy},
{"timeseriesThreshold", &tsTimeSeriesThreshold},
{"timeseriesInterval", &tsTimeSeriesInterval},
{"tmqMaxTopicNum", &tmqMaxTopicNum},
{"tmqRowSize", &tmqRowSize},
{"transPullupInterval", &tsTransPullupInterval},

View File

@ -1161,16 +1161,15 @@ int32_t tDeserializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) {
int32_t nVgroup = 0;
if (tDecodeI32(&decoder, &nVgroup) < 0) goto _exit;
if (nVgroup > 0) {
pReq->pVloads = taosArrayInit(nVgroup, sizeof(SVnodeLoadLite));
pReq->pVloads = taosArrayInit_s(sizeof(SVnodeLoadLite), nVgroup);
if (!pReq->pVloads) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int32_t i = 0; i < nVgroup; ++i) {
SVnodeLoadLite vload;
if (tDecodeI32(&decoder, &(vload.vgId)) < 0) goto _exit;
if (tDecodeI64(&decoder, &(vload.nTimeSeries)) < 0) goto _exit;
taosArrayPush(pReq->pVloads, &vload);
SVnodeLoadLite *vload = TARRAY_GET_ELEM(pReq->pVloads, i);
if (tDecodeI32(&decoder, &(vload->vgId)) < 0) goto _exit;
if (tDecodeI64(&decoder, &(vload->nTimeSeries)) < 0) goto _exit;
}
}

View File

@ -49,7 +49,7 @@ typedef struct SDnodeMgmt {
// dmHandle.c
SArray *dmGetMsgHandles();
void dmSendStatusReq(SDnodeMgmt *pMgmt);
void dmSendNotifyReq(SDnodeMgmt *pMgmt);
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq);
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -169,22 +169,10 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
dmProcessStatusRsp(pMgmt, &rpcRsp);
}
void dmSendNotifyReq(SDnodeMgmt *pMgmt) {
SNotifyReq req = {0};
taosThreadRwlockRdlock(&pMgmt->pData->lock);
req.dnodeId = pMgmt->pData->dnodeId;
taosThreadRwlockUnlock(&pMgmt->pData->lock);
req.clusterId = pMgmt->pData->clusterId;
SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
req.pVloads = vinfo.pVloads;
int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req);
void * pHead = rpcMallocCont(contLen);
tSerializeSNotifyReq(pHead, contLen, &req);
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
void *pHead = rpcMallocCont(contLen);
tSerializeSNotifyReq(pHead, contLen, pReq);
tFreeSNotifyReq(&req);
SRpcMsg rpcMsg = {.pCont = pHead,

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "tgrant.h"
#include "thttp.h"
static void *dmStatusThreadFp(void *param) {
@ -47,26 +48,85 @@ static void *dmStatusThreadFp(void *param) {
}
SDmNotifyHandle dmNotifyHdl = {.state = 0};
#define TIMESERIES_STASH_NUM 5
static void *dmNotifyThreadFp(void *param) {
SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-notify");
if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
return NULL;
}
bool wait = true;
int64_t lastNotify = 0;
// calculate approximate timeSeries per second
int64_t notifyTimeStamp[TIMESERIES_STASH_NUM];
int64_t notifyTimeSeries[TIMESERIES_STASH_NUM];
uint64_t nTotalNotify = 0;
int32_t head = -1;
int32_t tail = 0;
bool wait = true;
int32_t nDnode = 0;
int64_t lastNotify = 0;
SNotifyReq req = {0};
while (1) {
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
if (wait) tsem_wait(&dmNotifyHdl.sem);
atomic_store_8(&dmNotifyHdl.state, 1);
if (taosGetTimestampMs() - lastNotify < tsTimeSeriesInterval) {
taosMsleep(tsTimeSeriesInterval);
uint64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
goto _skip;
}
dmSendNotifyReq(pMgmt);
int64_t current = taosGetTimestampMs();
if (current - lastNotify > 1000) {
nDnode = dmGetDnodeSize(pMgmt->pData);
}
if (req.dnodeId == 0 || req.clusterId == 0) {
req.dnodeId = pMgmt->pData->dnodeId;
req.clusterId = pMgmt->pData->clusterId;
}
if (current - lastNotify < 10) {
if (remainTimeSeries > 1000000) {
taosMsleep(10);
} else if (remainTimeSeries > 500000) {
taosMsleep(5);
} else {
taosMsleep(2);
}
}
SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
req.pVloads = vinfo.pVloads;
int32_t nVgroup = taosArrayGetSize(req.pVloads);
int64_t nTimeSeries = 0;
for (int32_t i = 0; i < nVgroup; ++i) {
SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
nTimeSeries += vload->nTimeSeries;
}
notifyTimeSeries[tail] = nTimeSeries;
notifyTimeStamp[tail] = taosGetTimestampNs();
++nTotalNotify;
uint64_t approximateTimeSeries = 0;
if (nTotalNotify >= TIMESERIES_STASH_NUM) {
int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
if (timeDiff > 0 && timeDiff < 1e9 && tsDiff > 0) {
approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
}
}
if (++head == TIMESERIES_STASH_NUM) head = 0;
if (++tail == TIMESERIES_STASH_NUM) tail = 0;
if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
dmSendNotifyReq(pMgmt, &req);
}
tFreeSNotifyReq(&req);
lastNotify = taosGetTimestampMs();
_skip:
if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
wait = true;
continue;

View File

@ -182,6 +182,7 @@ TdFilePtr dmCheckRunning(const char *dataDir);
int32_t dmInitDndInfo(SDnodeData *pData);
// dmEps.c
int32_t dmGetDnodeSize(SDnodeData *pData);
int32_t dmReadEps(SDnodeData *pData);
int32_t dmWriteEps(SDnodeData *pData);
void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);

View File

@ -355,6 +355,14 @@ _OVER:
return code;
}
int32_t dmGetDnodeSize(SDnodeData *pData) {
int32_t size = 0;
taosThreadRwlockWrlock(&pData->lock);
size = taosArrayGetSize(pData->dnodeEps);
taosThreadRwlockUnlock(&pData->lock);
return size;
}
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
taosThreadRwlockWrlock(&pData->lock);
dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer);

View File

@ -77,6 +77,7 @@ void grantParseParameter() { mError("can't parsed parameter k"); }
void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value) {}
void grantAdd(EGrantType grant, uint64_t value) {}
void grantRestore(EGrantType grant, uint64_t value) {}
int64_t grantRemain(EGrantType grant) { return 0; }
char *tGetMachineId() { return NULL; };
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }

View File

@ -96,12 +96,6 @@ class TDTestCase:
"values": [0, 200, 2000],
"except_values": [-2, 2001]
},
{
"name": "timeseriesInterval",
"alias": "tsTimeSeriesInterval",
"values": [1, 10, 100],
"except_values": [-1, 0, 101]
},
{
"name": "minDiskFreeSize",
"alias": "tsMinDiskFreeSize",