Merge pull request #25522 from taosdata/enh/TD-29801-3.0

enh: batch create table
This commit is contained in:
Hongze Cheng 2024-04-28 17:16:23 +08:00 committed by GitHub
commit 0253aed220
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 100 additions and 26 deletions

View File

@ -60,6 +60,7 @@ typedef enum {
int32_t checkAndGetCryptKey(const char *encryptCode, const char *machineId, char **key);
int32_t generateEncryptCode(const char *key, const char *machineId, char **encryptCode);
int64_t grantRemain(EGrantType grant);
int32_t grantCheck(EGrantType grant);
int32_t grantCheckExpire(EGrantType grant);
char *tGetMachineId();

View File

@ -1161,16 +1161,15 @@ int32_t tDeserializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) {
int32_t nVgroup = 0;
if (tDecodeI32(&decoder, &nVgroup) < 0) goto _exit;
if (nVgroup > 0) {
pReq->pVloads = taosArrayInit(nVgroup, sizeof(SVnodeLoadLite));
pReq->pVloads = taosArrayInit_s(sizeof(SVnodeLoadLite), nVgroup);
if (!pReq->pVloads) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int32_t i = 0; i < nVgroup; ++i) {
SVnodeLoadLite vload;
if (tDecodeI32(&decoder, &(vload.vgId)) < 0) goto _exit;
if (tDecodeI64(&decoder, &(vload.nTimeSeries)) < 0) goto _exit;
taosArrayPush(pReq->pVloads, &vload);
SVnodeLoadLite *vload = TARRAY_GET_ELEM(pReq->pVloads, i);
if (tDecodeI32(&decoder, &(vload->vgId)) < 0) goto _exit;
if (tDecodeI64(&decoder, &(vload->nTimeSeries)) < 0) goto _exit;
}
}

View File

@ -49,7 +49,7 @@ typedef struct SDnodeMgmt {
// dmHandle.c
SArray *dmGetMsgHandles();
void dmSendStatusReq(SDnodeMgmt *pMgmt);
void dmSendNotifyReq(SDnodeMgmt *pMgmt);
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq);
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -169,23 +169,10 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
dmProcessStatusRsp(pMgmt, &rpcRsp);
}
void dmSendNotifyReq(SDnodeMgmt *pMgmt) {
SNotifyReq req = {0};
taosThreadRwlockRdlock(&pMgmt->pData->lock);
req.dnodeId = pMgmt->pData->dnodeId;
taosThreadRwlockUnlock(&pMgmt->pData->lock);
req.clusterId = pMgmt->pData->clusterId;
SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
req.pVloads = vinfo.pVloads;
int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req);
void * pHead = rpcMallocCont(contLen);
tSerializeSNotifyReq(pHead, contLen, &req);
tFreeSNotifyReq(&req);
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
void *pHead = rpcMallocCont(contLen);
tSerializeSNotifyReq(pHead, contLen, pReq);
SRpcMsg rpcMsg = {.pCont = pHead,
.contLen = contLen,

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "tgrant.h"
#include "thttp.h"
static void *dmStatusThreadFp(void *param) {
@ -47,21 +48,97 @@ static void *dmStatusThreadFp(void *param) {
}
SDmNotifyHandle dmNotifyHdl = {.state = 0};
#define TIMESERIES_STASH_NUM 5
static void *dmNotifyThreadFp(void *param) {
SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-notify");
if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
return NULL;
}
bool wait = true;
// calculate approximate timeSeries per second
int64_t notifyTimeStamp[TIMESERIES_STASH_NUM];
int64_t notifyTimeSeries[TIMESERIES_STASH_NUM];
int64_t approximateTimeSeries = 0;
uint64_t nTotalNotify = 0;
int32_t head, tail = 0;
bool wait = true;
int32_t nDnode = 0;
int64_t lastNotify = 0;
int64_t lastFetchDnode = 0;
SNotifyReq req = {0};
while (1) {
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
if (wait) tsem_wait(&dmNotifyHdl.sem);
atomic_store_8(&dmNotifyHdl.state, 1);
dmSendNotifyReq(pMgmt);
int64_t remainTimeSeries = grantRemain(TSDB_GRANT_TIMESERIES);
if (remainTimeSeries == INT64_MAX || remainTimeSeries <= 0) {
goto _skip;
}
int64_t current = taosGetTimestampMs();
if (current - lastFetchDnode > 1000) {
nDnode = dmGetDnodeSize(pMgmt->pData);
if (nDnode < 1) nDnode = 1;
lastFetchDnode = current;
}
if (req.dnodeId == 0 || req.clusterId == 0) {
req.dnodeId = pMgmt->pData->dnodeId;
req.clusterId = pMgmt->pData->clusterId;
}
if (current - lastNotify < 10) {
int64_t nCmprTimeSeries = approximateTimeSeries / 100;
if (nCmprTimeSeries < 1e5) nCmprTimeSeries = 1e5;
if (remainTimeSeries > nCmprTimeSeries * 10) {
taosMsleep(10);
} else if (remainTimeSeries > nCmprTimeSeries * 5) {
taosMsleep(5);
} else {
taosMsleep(2);
}
}
SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsLiteFp)(&vinfo);
req.pVloads = vinfo.pVloads;
int32_t nVgroup = taosArrayGetSize(req.pVloads);
int64_t nTimeSeries = 0;
for (int32_t i = 0; i < nVgroup; ++i) {
SVnodeLoadLite *vload = TARRAY_GET_ELEM(req.pVloads, i);
nTimeSeries += vload->nTimeSeries;
}
notifyTimeSeries[tail] = nTimeSeries;
notifyTimeStamp[tail] = taosGetTimestampNs();
++nTotalNotify;
approximateTimeSeries = 0;
if (nTotalNotify >= TIMESERIES_STASH_NUM) {
head = tail - TIMESERIES_STASH_NUM + 1;
if (head < 0) head += TIMESERIES_STASH_NUM;
int64_t timeDiff = notifyTimeStamp[tail] - notifyTimeStamp[head];
int64_t tsDiff = notifyTimeSeries[tail] - notifyTimeSeries[head];
if (tsDiff > 0) {
if (timeDiff > 0 && timeDiff < 1e9) {
approximateTimeSeries = (double)tsDiff * 1e9 / timeDiff;
if ((approximateTimeSeries * nDnode) > remainTimeSeries) {
dmSendNotifyReq(pMgmt, &req);
}
} else {
dmSendNotifyReq(pMgmt, &req);
}
}
} else {
dmSendNotifyReq(pMgmt, &req);
}
if (++tail == TIMESERIES_STASH_NUM) tail = 0;
tFreeSNotifyReq(&req);
lastNotify = taosGetTimestampMs();
_skip:
if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
wait = true;
continue;

View File

@ -182,6 +182,7 @@ TdFilePtr dmCheckRunning(const char *dataDir);
int32_t dmInitDndInfo(SDnodeData *pData);
// dmEps.c
int32_t dmGetDnodeSize(SDnodeData *pData);
int32_t dmReadEps(SDnodeData *pData);
int32_t dmWriteEps(SDnodeData *pData);
void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);

View File

@ -355,6 +355,14 @@ _OVER:
return code;
}
int32_t dmGetDnodeSize(SDnodeData *pData) {
int32_t size = 0;
taosThreadRwlockRdlock(&pData->lock);
size = taosArrayGetSize(pData->dnodeEps);
taosThreadRwlockUnlock(&pData->lock);
return size;
}
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
taosThreadRwlockWrlock(&pData->lock);
dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer);

View File

@ -77,6 +77,7 @@ void grantParseParameter() { mError("can't parsed parameter k"); }
void grantReset(SMnode *pMnode, EGrantType grant, uint64_t value) {}
void grantAdd(EGrantType grant, uint64_t value) {}
void grantRestore(EGrantType grant, uint64_t value) {}
int64_t grantRemain(EGrantType grant) { return 0; }
char *tGetMachineId() { return NULL; };
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }