Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_optimize

This commit is contained in:
Hongze Cheng 2022-08-30 11:28:19 +08:00
commit 4c847cde6b
6 changed files with 55 additions and 37 deletions

View File

@ -157,17 +157,17 @@ void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
size_t tsdbCacheGetCapacity(SVnode *pVnode); size_t tsdbCacheGetCapacity(SVnode *pVnode);
// tq // tq
typedef struct SMetaTableInfo{ typedef struct SMetaTableInfo {
int64_t suid; int64_t suid;
int64_t uid; int64_t uid;
SSchemaWrapper *schema; SSchemaWrapper *schema;
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
}SMetaTableInfo; } SMetaTableInfo;
typedef struct SIdInfo{ typedef struct SIdInfo {
int64_t version; int64_t version;
int32_t index; int32_t index;
}SIdInfo; } SIdInfo;
typedef struct SSnapContext { typedef struct SSnapContext {
SMeta *pMeta; SMeta *pMeta;
@ -180,8 +180,8 @@ typedef struct SSnapContext {
SArray *idList; SArray *idList;
int32_t index; int32_t index;
bool withMeta; bool withMeta;
bool queryMetaOrData; // true-get meta, false-get data bool queryMetaOrData; // true-get meta, false-get data
}SSnapContext; } SSnapContext;
typedef struct STqReader { typedef struct STqReader {
int64_t ver; int64_t ver;
@ -232,11 +232,12 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot);
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext** ctxRet); int32_t buildSnapContext(SMeta *pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); SSnapContext **ctxRet);
SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx); int32_t getMetafromSnapShot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid); SMetaTableInfo getUidfromSnapShot(SSnapContext *ctx);
int32_t destroySnapContext(SSnapContext* ctx); int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
int32_t destroySnapContext(SSnapContext *ctx);
// structs // structs
struct STsdbCfg { struct STsdbCfg {
@ -259,6 +260,7 @@ typedef struct {
int64_t numOfNTables; int64_t numOfNTables;
int64_t numOfNTimeSeries; int64_t numOfNTimeSeries;
int64_t numOfTimeSeries; int64_t numOfTimeSeries;
int64_t itvTimeSeries;
int64_t pointsWritten; int64_t pointsWritten;
int64_t totalStorage; int64_t totalStorage;
int64_t compStorage; int64_t compStorage;

View File

@ -615,9 +615,13 @@ int64_t metaGetTbNum(SMeta *pMeta) {
// N.B. Called by statusReq per second // N.B. Called by statusReq per second
int64_t metaGetTimeSeriesNum(SMeta *pMeta) { int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
// sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column) // sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column)
int64_t num = 0; if (pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 || ++pMeta->pVnode->config.vndStats.itvTimeSeries % 60 == 0) {
vnodeGetTimeSeriesNum(pMeta->pVnode, &num); int64_t num = 0;
pMeta->pVnode->config.vndStats.numOfTimeSeries = num; vnodeGetTimeSeriesNum(pMeta->pVnode, &num);
pMeta->pVnode->config.vndStats.numOfTimeSeries = num;
pMeta->pVnode->config.vndStats.itvTimeSeries = 0;
}
return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries; return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
} }
@ -890,7 +894,7 @@ const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) {
#ifdef TAG_FILTER_DEBUG #ifdef TAG_FILTER_DEBUG
if (IS_VAR_DATA_TYPE(val->type)) { if (IS_VAR_DATA_TYPE(val->type)) {
char* buf = taosMemoryCalloc(val->nData + 1, 1); char *buf = taosMemoryCalloc(val->nData + 1, 1);
memcpy(buf, val->pData, val->nData); memcpy(buf, val->pData, val->nData);
metaDebug("metaTag table val varchar index:%d cid:%d type:%d value:%s", 1, val->cid, val->type, buf); metaDebug("metaTag table val varchar index:%d cid:%d type:%d value:%s", 1, val->cid, val->type, buf);
taosMemoryFree(buf); taosMemoryFree(buf);
@ -900,13 +904,13 @@ const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) {
metaDebug("metaTag table val number index:%d cid:%d type:%d value:%f", 1, val->cid, val->type, dval); metaDebug("metaTag table val number index:%d cid:%d type:%d value:%f", 1, val->cid, val->type, dval);
} }
SArray* pTagVals = NULL; SArray *pTagVals = NULL;
tTagToValArray((STag*)pTag, &pTagVals); tTagToValArray((STag *)pTag, &pTagVals);
for (int i = 0; i < taosArrayGetSize(pTagVals); i++) { for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i); STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
if (IS_VAR_DATA_TYPE(pTagVal->type)) { if (IS_VAR_DATA_TYPE(pTagVal->type)) {
char* buf = taosMemoryCalloc(pTagVal->nData + 1, 1); char *buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
memcpy(buf, pTagVal->pData, pTagVal->nData); memcpy(buf, pTagVal->pData, pTagVal->nData);
metaDebug("metaTag table varchar index:%d cid:%d type:%d value:%s", i, pTagVal->cid, pTagVal->type, buf); metaDebug("metaTag table varchar index:%d cid:%d type:%d value:%s", i, pTagVal->cid, pTagVal->type, buf);
taosMemoryFree(buf); taosMemoryFree(buf);

View File

@ -312,15 +312,22 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat); SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
int32_t nLoops = 0;
// step 1: set rsma stat // step 1: set rsma stat
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 1); while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) {
++nLoops;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
ASSERT(pRSmaStat->commitAppliedVer > 0); ASSERT(pRSmaStat->commitAppliedVer > 0);
// step 2: wait for all triggered fetch tasks to finish // step 2: wait for all triggered fetch tasks to finish
int32_t nLoops = 0;
while (1) { while (1) {
if (T_REF_VAL_GET(pStat) == 0) { if (T_REF_VAL_GET(pStat) == 0) {
smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma)); smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma));

View File

@ -21,6 +21,7 @@
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
#define RSMA_FETCH_DELAY_MAX (900000) // ms #define RSMA_FETCH_DELAY_MAX (900000) // ms
#define RSMA_FETCH_ACTIVE_MAX (1800) // ms #define RSMA_FETCH_ACTIVE_MAX (1800) // ms
#define RSMA_FETCH_INTERVAL (5000) // ms
SSmaMgmt smaMgmt = { SSmaMgmt smaMgmt = {
.inited = 0, .inited = 0,
@ -1501,13 +1502,13 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
switch (rsmaTriggerStat) { switch (rsmaTriggerStat) {
case TASK_TRIGGER_STAT_PAUSED: case TASK_TRIGGER_STAT_PAUSED:
case TASK_TRIGGER_STAT_CANCELLED: { case TASK_TRIGGER_STAT_CANCELLED: {
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8 smaDebug("vgId:%d, rsma fetch task not start for level %" PRIi8 " since stat is %" PRIi8
", rsetId rsetId:%" PRIi64 " refId:%d", ", rsetId rsetId:%" PRIi64 " refId:%d",
SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId); SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId);
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId); taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
} }
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
return; return;
} }
default: default:
@ -1518,7 +1519,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE);
switch (fetchTriggerStat) { switch (fetchTriggerStat) {
case TASK_TRIGGER_STAT_ACTIVE: { case TASK_TRIGGER_STAT_ACTIVE: {
smaDebug("vgId:%d, rsma fetch task started for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid); SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
// async process // async process
pItem->fetchLevel = pItem->level; pItem->fetchLevel = pItem->level;
@ -1531,8 +1532,6 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
if (atomic_load_8(&pRSmaInfo->assigned) == 0) { if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
tsem_post(&(pStat->notEmpty)); tsem_post(&(pStat->notEmpty));
} }
smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level,
pRSmaInfo->suid);
} break; } break;
case TASK_TRIGGER_STAT_PAUSED: { case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused", smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused",
@ -1715,15 +1714,25 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type); smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type);
} }
if (type == RSMA_EXEC_OVERFLOW) { if (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2) == 0) {
tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr);
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);
} }
if (qallItemSize > 0) { if (qallItemSize > 0) {
atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize);
continue; continue;
} else if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { } else if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) {
continue; if (atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)) == 0) {
continue;
}
for (int32_t j = 0; j < TSDB_RETENTION_L2; ++j) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, j);
if (pItem->fetchLevel) {
pItem->fetchLevel = 0;
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
}
}
} }
break; break;
@ -1775,7 +1784,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if (pEnv->flag & SMA_ENV_FLG_CLOSE) { if (pEnv->flag & SMA_ENV_FLG_CLOSE) {
break; break;
} }
tsem_wait(&pRSmaStat->notEmpty); tsem_wait(&pRSmaStat->notEmpty);
if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {

View File

@ -295,11 +295,7 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke
} }
if (*pIter == (void *)GET_SHASH_NODE_DATA(pNode)) { if (*pIter == (void *)GET_SHASH_NODE_DATA(pNode)) {
if (!pPrev) { *pIter = pPrev ? GET_SHASH_NODE_DATA(pPrev) : NULL;
*pIter = NULL;
} else {
*pIter = GET_SHASH_NODE_DATA(pPrev);
}
} }
FREE_HASH_NODE(pNode); FREE_HASH_NODE(pNode);

View File

@ -97,7 +97,7 @@ int32_t tqDebugFlag = 135;
int32_t fsDebugFlag = 135; int32_t fsDebugFlag = 135;
int32_t metaDebugFlag = 135; int32_t metaDebugFlag = 135;
int32_t udfDebugFlag = 135; int32_t udfDebugFlag = 135;
int32_t smaDebugFlag = 135; int32_t smaDebugFlag = 131;
int32_t idxDebugFlag = 135; int32_t idxDebugFlag = 135;
int64_t dbgEmptyW = 0; int64_t dbgEmptyW = 0;