This commit is contained in:
Hongze Cheng 2024-09-24 18:04:58 +08:00
parent b55aec4b13
commit e16af0e45c
11 changed files with 57 additions and 31 deletions

View File

@ -216,7 +216,7 @@ void metaCacheClose(SMeta* pMeta) {
}
}
static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
static void metaRehashCache(SMetaCache* pCache, int8_t expand) {
int32_t code = 0;
int32_t nBucket;
@ -228,8 +228,7 @@ static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
if (aBucket == NULL) {
code = terrno;
goto _exit;
return;
}
// rehash
@ -250,9 +249,7 @@ static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
taosMemoryFree(pCache->sEntryCache.aBucket);
pCache->sEntryCache.nBucket = nBucket;
pCache->sEntryCache.aBucket = aBucket;
_exit:
return code;
return;
}
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
@ -279,7 +276,7 @@ int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
}
} else { // insert
if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
TAOS_UNUSED(metaRehashCache(pCache, 1));
metaRehashCache(pCache, 1);
iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
}
@ -317,7 +314,7 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
pCache->sEntryCache.nEntry--;
if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
TAOS_UNUSED(metaRehashCache(pCache, 0));
metaRehashCache(pCache, 0);
}
} else {
code = TSDB_CODE_NOT_FOUND;

View File

@ -60,7 +60,7 @@ int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
pMeta->path = (char *)&pMeta[1];
strcpy(pMeta->path, path);
(void)taosRealPath(pMeta->path, NULL, strlen(path) + 1);
int32_t ret = taosRealPath(pMeta->path, NULL, strlen(path) + 1);
pMeta->pVnode = pVnode;
@ -98,7 +98,7 @@ int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
TSDB_CHECK_CODE(code, lino, _exit);
sprintf(indexFullPath, "%s/%s", pMeta->path, "invert");
TAOS_UNUSED(taosMkDir(indexFullPath));
ret = taosMkDir(indexFullPath);
SIndexOpts opts = {.cacheSize = 8 * 1024 * 1024};
code = indexOpen(&opts, indexFullPath, (SIndex **)&pMeta->pTagIvtIdx);

View File

@ -285,7 +285,9 @@ static inline void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries;
if (deltaTS > tsTimeSeriesThreshold) {
if (0 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 2)) {
(void)tsem_post(&dmNotifyHdl.sem);
if (tsem_post(&dmNotifyHdl.sem) != 0) {
metaError("vgId:%d, failed to post semaphore, errno:%d", TD_VID(pMeta->pVnode), errno);
}
}
}
#endif

View File

@ -820,7 +820,10 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *p
int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1);
if (atomic_load_8(&pInfo->assigned) == 0) {
(void)tsem_post(&(pRSmaStat->notEmpty));
if (tsem_post(&(pRSmaStat->notEmpty)) != 0) {
smaError("vgId:%d, failed to post notEmpty semaphore for rsma %" PRIi64 " since %s", SMA_VID(pSma), suid,
tstrerror(terrno));
}
}
// smoothing consume
@ -1385,7 +1388,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
if (!ret) {
smaWarn("vgId:%d, rsma fetch task not reset for level %" PRIi8 " since tmr reset failed, rsetId:%d refId:%" PRIi64,
smaWarn("vgId:%d, rsma fetch task not reset for level %" PRIi8
" since tmr reset failed, rsetId:%d refId:%" PRIi64,
SMA_VID(pSma), pItem->level, smaMgmt.rsetId, pRSmaRef->refId);
}
}
@ -1407,7 +1411,10 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
atomic_store_8(&pItem->fetchLevel, 1);
if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
(void)tsem_post(&(pStat->notEmpty));
if (tsem_post(&(pStat->notEmpty)) != 0) {
smaError("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since sem post failed",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
}
}
} break;
case TASK_TRIGGER_STAT_INACTIVE: {

View File

@ -413,7 +413,7 @@ static void tsdbCommitInfoDestroy(STsdb *pTsdb) {
if (pTsdb->commitInfo) {
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i);
TAOS_UNUSED(vHashDrop(pTsdb->commitInfo->ht, info));
int32_t ret = vHashDrop(pTsdb->commitInfo->ht, info);
tsdbTFileSetClear(&info->fset);
taosMemoryFree(info);
}
@ -514,7 +514,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
SFileSetCommitInfo tinfo = {
.fid = fid,
};
TAOS_UNUSED(vHashGet(tsdb->commitInfo->ht, &tinfo, (void **)&info));
int32_t ret = vHashGet(tsdb->commitInfo->ht, &tinfo, (void **)&info);
if (info == NULL) {
TAOS_CHECK_GOTO(tsdbCommitInfoAdd(tsdb, fid), &lino, _exit);
}
@ -538,7 +538,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
};
// check if the file set already on the commit list
TAOS_UNUSED(vHashGet(tsdb->commitInfo->ht, &tinfo, (void **)&info));
int32_t ret = vHashGet(tsdb->commitInfo->ht, &tinfo, (void **)&info);
if (info != NULL) {
continue;
}

View File

@ -63,7 +63,9 @@ static void destroy_fs(STFileSystem **fs) {
TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
(void)tsem_destroy(&fs[0]->canEdit);
if (tsem_destroy(&fs[0]->canEdit) != 0) {
tsdbError("failed to destroy semaphore");
}
taosMemoryFree(fs[0]);
fs[0] = NULL;
}
@ -808,7 +810,11 @@ void tsdbEnableBgTask(STsdb *pTsdb) {
void tsdbCloseFS(STFileSystem **fs) {
if (fs[0] == NULL) return;
TAOS_UNUSED(tsdbDisableAndCancelAllBgTask((*fs)->tsdb));
int32_t code = tsdbDisableAndCancelAllBgTask((*fs)->tsdb);
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID((*fs)->tsdb->pVnode), __func__, __LINE__,
tstrerror(code));
}
close_file_system(fs[0]);
destroy_fs(fs);
return;
@ -838,7 +844,9 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
}
(void)tsem_wait(&fs->canEdit);
if (tsem_wait(&fs->canEdit) != 0) {
tsdbError("vgId:%d failed to wait semaphore", TD_VID(fs->tsdb->pVnode));
}
fs->etype = etype;
// edit
@ -944,13 +952,17 @@ _exit:
} else {
tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
}
(void)tsem_post(&fs->canEdit);
if (tsem_post(&fs->canEdit) != 0) {
tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
}
return code;
}
int32_t tsdbFSEditAbort(STFileSystem *fs) {
int32_t code = abort_edit(fs);
(void)tsem_post(&fs->canEdit);
if (tsem_post(&fs->canEdit) != 0) {
tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
}
return code;
}

View File

@ -436,7 +436,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
code = tsdbCacheGetPageS3(pFD->pTsdb->pgCache, pFD, pgno, &handle);
if (code != TSDB_CODE_SUCCESS) {
if (handle) {
(void)tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
}
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -447,7 +447,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle);
memcpy(pFD->pBuf, pPage, pFD->szPage);
(void)tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
// check
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {

View File

@ -609,7 +609,10 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
SValue value;
if (pRow->type == TSDBROW_ROW_FMT) {
(void)tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal);
int32_t ret = tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal);
if (ret != 0) {
tsdbError("failed to get column value, code:%d", ret);
}
} else if (pRow->type == TSDBROW_COL_FMT) {
if (iCol == 0) {
*pColVal =

View File

@ -432,7 +432,7 @@ static void vnodeAsyncLaunchWorker(SVAsync *async) {
if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) {
continue;
} else if (async->workers[i].state == EVA_WORKER_STATE_STOP) {
TAOS_UNUSED(taosThreadJoin(async->workers[i].thread, NULL));
int32_t ret = taosThreadJoin(async->workers[i].thread, NULL);
async->workers[i].state = EVA_WORKER_STATE_UINIT;
}

View File

@ -414,7 +414,10 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
pVnode->blocked = false;
pVnode->disableWrite = false;
(void)tsem_init(&pVnode->syncSem, 0, 0);
if (tsem_init(&pVnode->syncSem, 0, 0) != 0) {
vError("vgId:%d, failed to init semaphore", TD_VID(pVnode));
goto _err;
}
(void)taosThreadMutexInit(&pVnode->mutex, NULL);
(void)taosThreadCondInit(&pVnode->poolNotEmpty, NULL);

View File

@ -1833,7 +1833,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
}
if (info.suid) {
(void)metaGetInfo(pVnode->pMeta, info.suid, &info, NULL);
if (metaGetInfo(pVnode->pMeta, info.suid, &info, NULL) != 0) {
vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), info.suid);
}
}
if (pSubmitTbData->sver != info.skmVer) {