chore: timeseries distribute

This commit is contained in:
kailixu 2023-09-19 08:29:51 +08:00
parent 8111c18cc4
commit 6698861c8b
14 changed files with 201 additions and 30 deletions

View File

@ -1511,6 +1511,22 @@ int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
void tFreeSStatusReq(SStatusReq* pReq);
typedef struct {
int32_t vgId;
int64_t nTimeSeries;
} SDndNotifyInfo;
int32_t dmProcessNotifyReq(SDndNotifyInfo* pInfo);
typedef struct {
int32_t nVgroup;
SDndNotifyInfo* payload;
} SNotifyReq;
int32_t tSerializeSNotifyReq(void* buf, int32_t bufLen, SNotifyReq* pReq);
int32_t tDeserializeSNotifyReq(void* buf, int32_t bufLen, SNotifyReq* pReq);
void tFreeSNotifyReq(SNotifyReq* pReq);
typedef struct {
int32_t dnodeId;
int64_t clusterId;

View File

@ -179,6 +179,8 @@ enum { // WARN: new msg should be appended to segment tail
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_HEARTBEAT, "stream-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE, "retrieve-ip-white", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_WHITELIST, "get-user-whitelist", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_NOTIFY, "notify", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_NOTIFY, "grant-notify", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)

View File

@ -44,6 +44,7 @@ int32_t dmRun();
*/
void dmStop();
#ifdef __cplusplus
}
#endif

View File

@ -1035,6 +1035,52 @@ int32_t tDeserializeSMDropFullTextReq(void *buf, int32_t bufLen, SMDropFullTextR
return 0;
}
int32_t tSerializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->nVgroup) < 0) return -1;
for (int32_t i = 0; i < pReq->nVgroup; ++i) {
if (tEncodeI32(&encoder, (pReq->payload + i)->vgId) < 0) return -1;
if (tEncodeI64(&encoder, (pReq->payload + i)->nTimeSeries) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSNotifyReq(void *buf, int32_t bufLen, SNotifyReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->nVgroup) < 0) return -1;
pReq->payload = taosMemoryMalloc(pReq->nVgroup * (sizeof(SDndNotifyInfo)));
if (!pReq->payload) return -1;
for (int32_t i = 0; i < pReq->nVgroup; ++i) {
if (tDecodeI32(&decoder, &((pReq->payload + i)->vgId)) < 0) return -1;
if (tDecodeI64(&decoder, &((pReq->payload + i)->nTimeSeries)) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSNotifyReq(SNotifyReq *pReq) {
if (pReq) {
taosMemoryFreeClear(pReq->payload);
}
}
int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);

View File

@ -50,6 +50,8 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg);
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg);
int32_t dmStartNotify(SDnodeMgmt *pMgmt);
// dmWorker.c
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -130,11 +130,11 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
taosThreadRwlockUnlock(&pMgmt->pData->lock);
SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsFp)(&vinfo);
(*pMgmt->getVnodeLoadsFp)(&vinfo);
req.pVloads = vinfo.pVloads;
SMonMloadInfo minfo = {0};
(*pMgmt->getMnodeLoadsFp)(&minfo);
(*pMgmt->getMnodeLoadsFp)(&minfo);
req.mload = minfo.load;
(*pMgmt->getQnodeLoadsFp)(&req.qload);
@ -198,7 +198,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
SServerStatusRsp statusRsp = {0};
SMonMloadInfo minfo = {0};
(*pMgmt->getMnodeLoadsFp)(&minfo);
(*pMgmt->getMnodeLoadsFp)(&minfo);
if (minfo.isMnode &&
(minfo.load.syncState == TAOS_SYNC_STATE_ERROR || minfo.load.syncState == TAOS_SYNC_STATE_OFFLINE)) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
@ -207,7 +207,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
}
SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsFp)(&vinfo);
(*pMgmt->getVnodeLoadsFp)(&vinfo);
for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
if (pLoad->syncState == TAOS_SYNC_STATE_ERROR || pLoad->syncState == TAOS_SYNC_STATE_OFFLINE) {
@ -395,6 +395,7 @@ SArray *dmGetMsgHandles() {
// Requests handled by MNODE
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_NOTIFY, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
code = 0;
@ -407,3 +408,99 @@ _OVER:
return pArray;
}
}
#ifndef TD_ENTERPRISE
int32_t dmStartNotify(SDnodeMgmt *pMgmt) { return 0; }
int32_t dmProcessNotifyReq(SDndNotifyInfo *pInfo) { return 0; }
#else
static SHashObj *tsDndNotifyInfo = NULL;
static int64_t tsTimeSeries = 0;
static int8_t tsDndNotifyInitLock = 0;
static int8_t tsDndNotifyLock = 0;
static SDnodeMgmt *pSDnodeMgmt = NULL;
int32_t dmStartNotify(SDnodeMgmt *pMgmt) {
pSDnodeMgmt = pMgmt;
return 0;
}
int32_t dmProcessNotifyReq(SDndNotifyInfo *pInfo) {
if (atomic_load_8(&tsDndNotifyInitLock) != 2) {
if (atomic_val_compare_exchange_8(&tsDndNotifyInitLock, 0, 1) == 0) {
tsDndNotifyInfo = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
// fetch the latest tsTimeSeries of vgroups in current dnode
atomic_store_8(&tsDndNotifyInitLock, 2);
} else {
int32_t nLoops = 0;
while (atomic_load_8(&tsDndNotifyInitLock) != 2) {
if (++nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
}
}
if (atomic_val_compare_exchange_8(&tsDndNotifyLock, 0, 1) != 0) {
return 0;
}
int64_t lastTimeSeries = atomic_load_64(&tsTimeSeries);
int64_t *val = NULL;
if ((val = taosHashGet(tsDndNotifyInfo, &pInfo->vgId, sizeof(pInfo->vgId)))) {
if (*val != pInfo->nTimeSeries) {
assert(*val > 0);
atomic_add_fetch_64(&tsTimeSeries, pInfo->nTimeSeries - *val);
taosHashPut(tsDndNotifyInfo, &pInfo->vgId, sizeof(pInfo->vgId), &pInfo->nTimeSeries, sizeof(pInfo->nTimeSeries));
}
} else {
atomic_add_fetch_64(&tsTimeSeries, pInfo->nTimeSeries);
taosHashPut(tsDndNotifyInfo, &pInfo->vgId, sizeof(pInfo->vgId), &pInfo->nTimeSeries, sizeof(pInfo->nTimeSeries));
}
if (atomic_load_64(&tsTimeSeries) - lastTimeSeries > 100) {
int32_t hashSize = taosHashGetSize(tsDndNotifyInfo);
SNotifyReq req = {.nVgroup = hashSize};
req.payload = taosMemoryMalloc(sizeof(SDndNotifyInfo) * hashSize);
int64_t *nVal = NULL;
int32_t iter = 0;
int64_t nnn = 0;
size_t kLen = sizeof(int32_t);
while ((nVal = taosHashIterate(tsDndNotifyInfo, nVal)) && iter < hashSize) {
int32_t *nVgId = taosHashGetKey(nVal, &kLen);
(req.payload + iter)->vgId = *nVgId;
(req.payload + iter)->nTimeSeries = *nVal;
++iter;
}
// assert(nnn < 1000000);
int32_t contLen = tSerializeSNotifyReq(NULL, 0, &req);
if (contLen <= 0) return -1;
void *pHead = rpcMallocCont(contLen);
if (!pHead) {
tFreeSNotifyReq(&req);
atomic_store_8(&tsDndNotifyLock, 0);
return -1;
}
tSerializeSNotifyReq(pHead, contLen, &req);
tFreeSNotifyReq(&req);
SRpcMsg rpcMsg = {.pCont = pHead,
.contLen = contLen,
.msgType = TDMT_MND_NOTIFY,
.info.ahandle = NULL,
.info.refId = 0,
.info.noResp = 1};
SEpSet epSet = {0};
dmGetMnodeEpSet(pSDnodeMgmt->pData, &epSet);
rpcSendRequest(pSDnodeMgmt->msgCb.clientRpc, &epSet, &rpcMsg, NULL);
}
atomic_store_8(&tsDndNotifyLock, 0);
return 0;
}
#endif

View File

@ -26,7 +26,7 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
if (dmStartCrashReportThread(pMgmt) != 0) {
return -1;
}
if (dmStartNotifyThread(pMgmt) != 0) {
if(dmStartNotify(pMgmt) != 0) {
return -1;
}
return 0;

View File

@ -133,21 +133,6 @@ static void *dmCrashReportThreadFp(void *param) {
return NULL;
}
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->statusThread, &thAttr, dmStatusThreadFp, pMgmt) != 0) {
dError("failed to create status thread since %s", strerror(errno));
return -1;
}
taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("dnode-status", "initialized");
return 0;
}
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
@ -266,6 +251,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_MND_GRANT:
code = dmProcessGrantReq(&pMgmt->pData->clusterId, pMsg);
break;
case TDMT_MND_GRANT_NOTIFY:
code = dmProcessGrantNotify(NULL, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dGError("msg:%p, not processed in mgmt queue", pMsg);

View File

@ -178,6 +178,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_NOTIFY, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -92,6 +92,8 @@ typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo);
typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo);
typedef int32_t (*ProcessAlterNodeTypeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
int32_t dmProcessNotifyReq(SDndNotifyInfo *pInfo);
typedef struct {
int32_t dnodeId;
int64_t clusterId;

View File

@ -79,6 +79,14 @@ static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t opLen, int32_t *pOutValue);
#ifndef TD_ENTERPRISE
static int32_t mndUpdateClusterInfo(SRpcMsg *pReq) { return 0; }
static int32_t mndProcessNotifyReq(SRpcMsg *pReq) { return 0; }
#else
int32_t mndUpdateClusterInfo(SRpcMsg *pReq);
int32_t mndProcessNotifyReq(SRpcMsg *pReq);
#endif
int32_t mndInitDnode(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_DNODE,
@ -96,6 +104,7 @@ int32_t mndInitDnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CONFIG_DNODE, mndProcessConfigDnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CONFIG_DNODE_RSP, mndProcessConfigDnodeRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STATUS, mndProcessStatusReq);
mndSetMsgHandle(pMnode, TDMT_MND_NOTIFY, mndProcessNotifyReq);
mndSetMsgHandle(pMnode, TDMT_MND_DNODE_LIST, mndProcessDnodeListReq);
mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
@ -480,12 +489,6 @@ static bool mndUpdateMnodeState(SMnodeObj *pObj, SMnodeLoad *pMload) {
return stateChanged;
}
#ifndef TD_ENTERPRISE
static int32_t mndUpdateClusterInfo(SRpcMsg *pReq) { return 0; }
#else
int32_t mndUpdateClusterInfo(SRpcMsg *pReq);
#endif
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStatusReq statusReq = {0};
@ -537,6 +540,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs();
int64_t nDiffTimeSeries = 0;
bool online = mndIsDnodeOnline(pDnode, curMs);
bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);

View File

@ -288,6 +288,7 @@ typedef struct {
int64_t numOfSTables;
int64_t numOfCTables;
int64_t numOfNTables;
int64_t numOfCmprTimeSeries;
int64_t numOfNTimeSeries;
int64_t numOfTimeSeries;
int64_t itvTimeSeries;

View File

@ -36,6 +36,7 @@ static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) {
pInfo->uid = pEntry->uid;
pInfo->version = pEntry->version;
@ -736,6 +737,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
metaReaderClear(&mr);
// build SMetaEntry
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
me.version = ver;
me.type = pReq->type;
me.uid = pReq->uid;
@ -769,7 +771,9 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
}
#endif
++pMeta->pVnode->config.vndStats.numOfCTables;
++pStats->numOfCTables;
pStats->numOfTimeSeries += 2; // 2 cols for test.
metaWLock(pMeta);
metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1);
@ -784,11 +788,18 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
me.ntbEntry.schemaRow = pReq->ntb.schemaRow;
me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1;
++pMeta->pVnode->config.vndStats.numOfNTables;
pMeta->pVnode->config.vndStats.numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1;
++pStats->numOfNTables;
pStats->numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1;
}
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
// assert(pStats->numOfTimeSeries + pStats->numOfNTimeSeries < 200000);
// if (pStats->numOfTimeSeries + pStats->numOfNTimeSeries - pStats->numOfCmprTimeSeries > 100) {
// pStats->numOfCmprTimeSeries = pStats->numOfTimeSeries + pStats->numOfNTimeSeries;
// SDndNotifyInfo dNotifyInfo = {.vgId = pMeta->pVnode->config.vgId,
// .nTimeSeries = pStats->numOfTimeSeries + pStats->numOfNTimeSeries};
// dmProcessNotifyReq(&dNotifyInfo);
// }
atomic_val_compare_exchange_8(&tsNeedUpdStatus, 0, 1);

View File

@ -388,7 +388,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 0);
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 1);
pLoad->totalStorage = (int64_t)3 * 1073741824;
pLoad->compStorage = (int64_t)2 * 1073741824;
pLoad->pointsWritten = 100;