Merge pull request #28075 from taosdata/enh/TD-31890-17

ehn: remove void
This commit is contained in:
Hongze Cheng 2024-09-24 20:17:18 +08:00 committed by GitHub
commit a9f170c59e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 237 additions and 131 deletions

View File

@ -115,7 +115,7 @@ typedef struct {
} SValueColumnCompressInfo; } SValueColumnCompressInfo;
int32_t tValueColumnInit(SValueColumn *valCol); int32_t tValueColumnInit(SValueColumn *valCol);
int32_t tValueColumnDestroy(SValueColumn *valCol); void tValueColumnDestroy(SValueColumn *valCol);
void tValueColumnClear(SValueColumn *valCol); void tValueColumnClear(SValueColumn *valCol);
int32_t tValueColumnAppend(SValueColumn *valCol, const SValue *value); int32_t tValueColumnAppend(SValueColumn *valCol, const SValue *value);
int32_t tValueColumnUpdate(SValueColumn *valCol, int32_t idx, const SValue *value); int32_t tValueColumnUpdate(SValueColumn *valCol, int32_t idx, const SValue *value);

View File

@ -4171,12 +4171,12 @@ int32_t tValueColumnInit(SValueColumn *valCol) {
return 0; return 0;
} }
int32_t tValueColumnDestroy(SValueColumn *valCol) { void tValueColumnDestroy(SValueColumn *valCol) {
valCol->type = TSDB_DATA_TYPE_NULL; valCol->type = TSDB_DATA_TYPE_NULL;
valCol->numOfValues = 0; valCol->numOfValues = 0;
tBufferDestroy(&valCol->data); tBufferDestroy(&valCol->data);
tBufferDestroy(&valCol->offsets); tBufferDestroy(&valCol->offsets);
return 0; return;
} }
void tValueColumnClear(SValueColumn *valCol) { void tValueColumnClear(SValueColumn *valCol) {

View File

@ -292,7 +292,7 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
// tsdbRead.c ============================================================================================== // tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap, const char* id); int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap, const char *id);
void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
int32_t tsdbGetTableSchema(SMeta *pMeta, int64_t uid, STSchema **pSchema, int64_t *suid); int32_t tsdbGetTableSchema(SMeta *pMeta, int64_t uid, STSchema **pSchema, int64_t *suid);
@ -1069,6 +1069,13 @@ int32_t tsdbSnapPrepDescription(SVnode *pVnode, SSnapshot *pSnap);
void tsdbRemoveFile(const char *path); void tsdbRemoveFile(const char *path);
#define taosCloseFileWithLog(fd) \
do { \
if (taosCloseFile(fd) < 0) { \
tsdbTrace("failed to close file"); \
} \
} while (0)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -216,7 +216,7 @@ void metaCacheClose(SMeta* pMeta) {
} }
} }
static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) { static void metaRehashCache(SMetaCache* pCache, int8_t expand) {
int32_t code = 0; int32_t code = 0;
int32_t nBucket; int32_t nBucket;
@ -228,8 +228,7 @@ static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*)); SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
if (aBucket == NULL) { if (aBucket == NULL) {
code = terrno; return;
goto _exit;
} }
// rehash // rehash
@ -250,9 +249,7 @@ static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) {
taosMemoryFree(pCache->sEntryCache.aBucket); taosMemoryFree(pCache->sEntryCache.aBucket);
pCache->sEntryCache.nBucket = nBucket; pCache->sEntryCache.nBucket = nBucket;
pCache->sEntryCache.aBucket = aBucket; pCache->sEntryCache.aBucket = aBucket;
return;
_exit:
return code;
} }
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) { int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
@ -279,7 +276,7 @@ int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
} }
} else { // insert } else { // insert
if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) { if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
TAOS_UNUSED(metaRehashCache(pCache, 1)); metaRehashCache(pCache, 1);
iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket; iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
} }
@ -317,7 +314,7 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
pCache->sEntryCache.nEntry--; pCache->sEntryCache.nEntry--;
if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 && if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) { pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
TAOS_UNUSED(metaRehashCache(pCache, 0)); metaRehashCache(pCache, 0);
} }
} else { } else {
code = TSDB_CODE_NOT_FOUND; code = TSDB_CODE_NOT_FOUND;

View File

@ -60,7 +60,7 @@ int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
pMeta->path = (char *)&pMeta[1]; pMeta->path = (char *)&pMeta[1];
strcpy(pMeta->path, path); strcpy(pMeta->path, path);
(void)taosRealPath(pMeta->path, NULL, strlen(path) + 1); int32_t ret = taosRealPath(pMeta->path, NULL, strlen(path) + 1);
pMeta->pVnode = pVnode; pMeta->pVnode = pVnode;
@ -98,7 +98,7 @@ int32_t metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
sprintf(indexFullPath, "%s/%s", pMeta->path, "invert"); sprintf(indexFullPath, "%s/%s", pMeta->path, "invert");
TAOS_UNUSED(taosMkDir(indexFullPath)); ret = taosMkDir(indexFullPath);
SIndexOpts opts = {.cacheSize = 8 * 1024 * 1024}; SIndexOpts opts = {.cacheSize = 8 * 1024 * 1024};
code = indexOpen(&opts, indexFullPath, (SIndex **)&pMeta->pTagIvtIdx); code = indexOpen(&opts, indexFullPath, (SIndex **)&pMeta->pTagIvtIdx);

View File

@ -285,7 +285,9 @@ static inline void metaTimeSeriesNotifyCheck(SMeta *pMeta) {
int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries; int64_t deltaTS = nTimeSeries - pMeta->pVnode->config.vndStats.numOfReportedTimeSeries;
if (deltaTS > tsTimeSeriesThreshold) { if (deltaTS > tsTimeSeriesThreshold) {
if (0 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 2)) { if (0 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 2)) {
(void)tsem_post(&dmNotifyHdl.sem); if (tsem_post(&dmNotifyHdl.sem) != 0) {
metaError("vgId:%d, failed to post semaphore, errno:%d", TD_VID(pMeta->pVnode), errno);
}
} }
} }
#endif #endif

View File

@ -89,7 +89,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
if (pItem->tmrId) { if (pItem->tmrId) {
smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId, smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId,
pInfo->suid, i + 1); pInfo->suid, i + 1);
if(!taosTmrStopA(&pItem->tmrId)){ if (!taosTmrStopA(&pItem->tmrId)) {
smaError("vgId:%d, failed to stop fetch timer for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid, smaError("vgId:%d, failed to stop fetch timer for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid,
i + 1); i + 1);
} }
@ -820,7 +820,10 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *p
int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1); int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1);
if (atomic_load_8(&pInfo->assigned) == 0) { if (atomic_load_8(&pInfo->assigned) == 0) {
(void)tsem_post(&(pRSmaStat->notEmpty)); if (tsem_post(&(pRSmaStat->notEmpty)) != 0) {
smaError("vgId:%d, failed to post notEmpty semaphore for rsma %" PRIi64 " since %s", SMA_VID(pSma), suid,
tstrerror(terrno));
}
} }
// smoothing consume // smoothing consume
@ -1385,7 +1388,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) {
bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); bool ret = taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
if (!ret) { if (!ret) {
smaWarn("vgId:%d, rsma fetch task not reset for level %" PRIi8 " since tmr reset failed, rsetId:%d refId:%" PRIi64, smaWarn("vgId:%d, rsma fetch task not reset for level %" PRIi8
" since tmr reset failed, rsetId:%d refId:%" PRIi64,
SMA_VID(pSma), pItem->level, smaMgmt.rsetId, pRSmaRef->refId); SMA_VID(pSma), pItem->level, smaMgmt.rsetId, pRSmaRef->refId);
} }
} }
@ -1407,7 +1411,10 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
atomic_store_8(&pItem->fetchLevel, 1); atomic_store_8(&pItem->fetchLevel, 1);
if (atomic_load_8(&pRSmaInfo->assigned) == 0) { if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
(void)tsem_post(&(pStat->notEmpty)); if (tsem_post(&(pStat->notEmpty)) != 0) {
smaError("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since sem post failed",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
}
} }
} break; } break;
case TASK_TRIGGER_STAT_INACTIVE: { case TASK_TRIGGER_STAT_INACTIVE: {

View File

@ -199,9 +199,9 @@ _exit:
return code; return code;
} }
static int32_t tsdbCommitCloseReader(SCommitter2 *committer) { static void tsdbCommitCloseReader(SCommitter2 *committer) {
TARRAY2_CLEAR(committer->sttReaderArray, tsdbSttFileReaderClose); TARRAY2_CLEAR(committer->sttReaderArray, tsdbSttFileReaderClose);
return 0; return;
} }
static int32_t tsdbCommitOpenReader(SCommitter2 *committer) { static int32_t tsdbCommitOpenReader(SCommitter2 *committer) {
@ -243,19 +243,19 @@ static int32_t tsdbCommitOpenReader(SCommitter2 *committer) {
_exit: _exit:
if (code) { if (code) {
TAOS_UNUSED(tsdbCommitCloseReader(committer)); tsdbCommitCloseReader(committer);
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(committer->tsdb->pVnode), __func__, __FILE__, lino, tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(committer->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
} }
static int32_t tsdbCommitCloseIter(SCommitter2 *committer) { static void tsdbCommitCloseIter(SCommitter2 *committer) {
tsdbIterMergerClose(&committer->tombIterMerger); tsdbIterMergerClose(&committer->tombIterMerger);
tsdbIterMergerClose(&committer->dataIterMerger); tsdbIterMergerClose(&committer->dataIterMerger);
TARRAY2_CLEAR(committer->tombIterArray, tsdbIterClose); TARRAY2_CLEAR(committer->tombIterArray, tsdbIterClose);
TARRAY2_CLEAR(committer->dataIterArray, tsdbIterClose); TARRAY2_CLEAR(committer->dataIterArray, tsdbIterClose);
return 0; return;
} }
static int32_t tsdbCommitOpenIter(SCommitter2 *committer) { static int32_t tsdbCommitOpenIter(SCommitter2 *committer) {
@ -309,7 +309,7 @@ static int32_t tsdbCommitOpenIter(SCommitter2 *committer) {
_exit: _exit:
if (code) { if (code) {
TAOS_UNUSED(tsdbCommitCloseIter(committer)); tsdbCommitCloseIter(committer);
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(committer->tsdb->pVnode), __func__, __FILE__, lino, tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(committer->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code)); tstrerror(code));
} }
@ -322,7 +322,7 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
STsdb *tsdb = committer->tsdb; STsdb *tsdb = committer->tsdb;
// check if can commit // check if can commit
TAOS_UNUSED(tsdbFSCheckCommit(tsdb, committer->ctx->info->fid)); tsdbFSCheckCommit(tsdb, committer->ctx->info->fid);
committer->ctx->expLevel = tsdbFidLevel(committer->ctx->info->fid, &tsdb->keepCfg, committer->now); committer->ctx->expLevel = tsdbFidLevel(committer->ctx->info->fid, &tsdb->keepCfg, committer->now);
tsdbFidKeyRange(committer->ctx->info->fid, committer->minutes, committer->precision, &committer->ctx->minKey, tsdbFidKeyRange(committer->ctx->info->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
@ -355,8 +355,8 @@ static int32_t tsdbCommitFileSetEnd(SCommitter2 *committer) {
int32_t lino = 0; int32_t lino = 0;
TAOS_CHECK_GOTO(tsdbCommitCloseWriter(committer), &lino, _exit); TAOS_CHECK_GOTO(tsdbCommitCloseWriter(committer), &lino, _exit);
TAOS_CHECK_GOTO(tsdbCommitCloseIter(committer), &lino, _exit); tsdbCommitCloseIter(committer);
TAOS_CHECK_GOTO(tsdbCommitCloseReader(committer), &lino, _exit); tsdbCommitCloseReader(committer);
_exit: _exit:
if (code) { if (code) {
@ -409,11 +409,11 @@ static uint32_t tFileSetCommitInfoHash(const void *arg) {
return MurmurHash3_32((const char *)&info->fid, sizeof(info->fid)); return MurmurHash3_32((const char *)&info->fid, sizeof(info->fid));
} }
static int32_t tsdbCommitInfoDestroy(STsdb *pTsdb) { static void tsdbCommitInfoDestroy(STsdb *pTsdb) {
if (pTsdb->commitInfo) { if (pTsdb->commitInfo) {
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) { for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i); SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i);
TAOS_UNUSED(vHashDrop(pTsdb->commitInfo->ht, info)); int32_t ret = vHashDrop(pTsdb->commitInfo->ht, info);
tsdbTFileSetClear(&info->fset); tsdbTFileSetClear(&info->fset);
taosMemoryFree(info); taosMemoryFree(info);
} }
@ -423,7 +423,7 @@ static int32_t tsdbCommitInfoDestroy(STsdb *pTsdb) {
pTsdb->commitInfo->arr = NULL; pTsdb->commitInfo->arr = NULL;
taosMemoryFreeClear(pTsdb->commitInfo); taosMemoryFreeClear(pTsdb->commitInfo);
} }
return 0; return;
} }
static int32_t tsdbCommitInfoInit(STsdb *pTsdb) { static int32_t tsdbCommitInfoInit(STsdb *pTsdb) {
@ -444,7 +444,7 @@ static int32_t tsdbCommitInfoInit(STsdb *pTsdb) {
_exit: _exit:
if (code) { if (code) {
TAOS_UNUSED(tsdbCommitInfoDestroy(pTsdb)); tsdbCommitInfoDestroy(pTsdb);
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
} }
return code; return code;
@ -514,7 +514,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
SFileSetCommitInfo tinfo = { SFileSetCommitInfo tinfo = {
.fid = fid, .fid = fid,
}; };
TAOS_UNUSED(vHashGet(tsdb->commitInfo->ht, &tinfo, (void **)&info)); int32_t ret = vHashGet(tsdb->commitInfo->ht, &tinfo, (void **)&info);
if (info == NULL) { if (info == NULL) {
TAOS_CHECK_GOTO(tsdbCommitInfoAdd(tsdb, fid), &lino, _exit); TAOS_CHECK_GOTO(tsdbCommitInfoAdd(tsdb, fid), &lino, _exit);
} }
@ -538,7 +538,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
}; };
// check if the file set already on the commit list // check if the file set already on the commit list
TAOS_UNUSED(vHashGet(tsdb->commitInfo->ht, &tinfo, (void **)&info)); int32_t ret = vHashGet(tsdb->commitInfo->ht, &tinfo, (void **)&info);
if (info != NULL) { if (info != NULL) {
continue; continue;
} }
@ -586,7 +586,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
_exit: _exit:
if (code) { if (code) {
TAOS_UNUSED(tsdbCommitInfoDestroy(tsdb)); tsdbCommitInfoDestroy(tsdb);
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
} }
return code; return code;
@ -716,7 +716,7 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
(void)taosThreadMutexUnlock(&tsdb->mutex); (void)taosThreadMutexUnlock(&tsdb->mutex);
TAOS_UNUSED(tsdbCommitInfoDestroy(tsdb)); tsdbCommitInfoDestroy(tsdb);
tsdbUnrefMemTable(pMemTable, NULL, true); tsdbUnrefMemTable(pMemTable, NULL, true);
} }
@ -745,7 +745,7 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) {
} }
} }
(void)taosThreadMutexUnlock(&pTsdb->mutex); (void)taosThreadMutexUnlock(&pTsdb->mutex);
TAOS_UNUSED(tsdbCommitInfoDestroy(pTsdb)); tsdbCommitInfoDestroy(pTsdb);
_exit: _exit:
if (code) { if (code) {

View File

@ -130,7 +130,7 @@ _exit:
tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
} }
taosMemoryFree(pData); taosMemoryFree(pData);
(void)taosCloseFile(&pFD); taosCloseFileWithLog(&pFD);
return code; return code;
} }
@ -300,26 +300,26 @@ static int32_t load_fs(const char *fname, STsdbFS *pFS) {
int64_t size; int64_t size;
code = taosFStatFile(pFD, &size, NULL); code = taosFStatFile(pFD, &size, NULL);
if (code != 0) { if (code != 0) {
(void)taosCloseFile(&pFD); taosCloseFileWithLog(&pFD);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
pData = taosMemoryMalloc(size); pData = taosMemoryMalloc(size);
if (pData == NULL) { if (pData == NULL) {
code = terrno; code = terrno;
(void)taosCloseFile(&pFD); taosCloseFileWithLog(&pFD);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (taosReadFile(pFD, pData, size) < 0) { if (taosReadFile(pFD, pData, size) < 0) {
code = terrno; code = terrno;
(void)taosCloseFile(&pFD); taosCloseFileWithLog(&pFD);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (!taosCheckChecksumWhole(pData, size)) { if (!taosCheckChecksumWhole(pData, size)) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
(void)taosCloseFile(&pFD); taosCloseFileWithLog(&pFD);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -331,7 +331,7 @@ _exit:
tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
} }
taosMemoryFree(pData); taosMemoryFree(pData);
(void)taosCloseFile(&pFD); taosCloseFileWithLog(&pFD);
return code; return code;
} }

View File

@ -44,7 +44,12 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
} }
fs[0]->tsdb = pTsdb; fs[0]->tsdb = pTsdb;
(void)tsem_init(&fs[0]->canEdit, 0, 1); int32_t code = tsem_init(&fs[0]->canEdit, 0, 1);
if (code) {
taosMemoryFree(fs[0]);
return code;
}
fs[0]->fsstate = TSDB_FS_STATE_NORMAL; fs[0]->fsstate = TSDB_FS_STATE_NORMAL;
fs[0]->neid = 0; fs[0]->neid = 0;
TARRAY2_INIT(fs[0]->fSetArr); TARRAY2_INIT(fs[0]->fSetArr);
@ -58,7 +63,9 @@ static void destroy_fs(STFileSystem **fs) {
TARRAY2_DESTROY(fs[0]->fSetArr, NULL); TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL); TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
(void)tsem_destroy(&fs[0]->canEdit); if (tsem_destroy(&fs[0]->canEdit) != 0) {
tsdbError("failed to destroy semaphore");
}
taosMemoryFree(fs[0]); taosMemoryFree(fs[0]);
fs[0] = NULL; fs[0] = NULL;
} }
@ -100,7 +107,7 @@ _exit:
tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code)); tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
} }
taosMemoryFree(data); taosMemoryFree(data);
(void)taosCloseFile(&fp); taosCloseFileWithLog(&fp);
return code; return code;
} }
@ -140,7 +147,7 @@ _exit:
tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code)); tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
json[0] = NULL; json[0] = NULL;
} }
(void)taosCloseFile(&fp); taosCloseFileWithLog(&fp);
taosMemoryFree(data); taosMemoryFree(data);
return code; return code;
} }
@ -803,7 +810,11 @@ void tsdbEnableBgTask(STsdb *pTsdb) {
void tsdbCloseFS(STFileSystem **fs) { void tsdbCloseFS(STFileSystem **fs) {
if (fs[0] == NULL) return; if (fs[0] == NULL) return;
TAOS_UNUSED(tsdbDisableAndCancelAllBgTask((*fs)->tsdb)); int32_t code = tsdbDisableAndCancelAllBgTask((*fs)->tsdb);
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID((*fs)->tsdb->pVnode), __func__, __LINE__,
tstrerror(code));
}
close_file_system(fs[0]); close_file_system(fs[0]);
destroy_fs(fs); destroy_fs(fs);
return; return;
@ -833,7 +844,9 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M); current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
} }
(void)tsem_wait(&fs->canEdit); if (tsem_wait(&fs->canEdit) != 0) {
tsdbError("vgId:%d failed to wait semaphore", TD_VID(fs->tsdb->pVnode));
}
fs->etype = etype; fs->etype = etype;
// edit // edit
@ -865,7 +878,7 @@ static void tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
} }
} }
int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) { void tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
(void)taosThreadMutexLock(&tsdb->mutex); (void)taosThreadMutexLock(&tsdb->mutex);
STFileSet *fset; STFileSet *fset;
tsdbFSGetFSet(tsdb->pFS, fid, &fset); tsdbFSGetFSet(tsdb->pFS, fid, &fset);
@ -877,7 +890,7 @@ int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
} }
} }
(void)taosThreadMutexUnlock(&tsdb->mutex); (void)taosThreadMutexUnlock(&tsdb->mutex);
return 0; return;
} }
// IMPORTANT: the caller must hold fs->tsdb->mutex // IMPORTANT: the caller must hold fs->tsdb->mutex
@ -939,13 +952,17 @@ _exit:
} else { } else {
tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype); tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
} }
(void)tsem_post(&fs->canEdit); if (tsem_post(&fs->canEdit) != 0) {
tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
}
return code; return code;
} }
int32_t tsdbFSEditAbort(STFileSystem *fs) { int32_t tsdbFSEditAbort(STFileSystem *fs) {
int32_t code = abort_edit(fs); int32_t code = abort_edit(fs);
(void)tsem_post(&fs->canEdit); if (tsem_post(&fs->canEdit) != 0) {
tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
}
return code; return code;
} }

View File

@ -60,7 +60,7 @@ int32_t tsdbFSEditCommit(STFileSystem *fs);
int32_t tsdbFSEditAbort(STFileSystem *fs); int32_t tsdbFSEditAbort(STFileSystem *fs);
// other // other
void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset); void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid); void tsdbFSCheckCommit(STsdb *tsdb, int32_t fid);
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset); void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset);
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid); void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid);
// utils // utils

View File

@ -436,7 +436,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
code = tsdbCacheGetPageS3(pFD->pTsdb->pgCache, pFD, pgno, &handle); code = tsdbCacheGetPageS3(pFD->pTsdb->pgCache, pFD, pgno, &handle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
if (handle) { if (handle) {
(void)tsdbCacheRelease(pFD->pTsdb->pgCache, handle); tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
} }
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -447,7 +447,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle); uint8_t *pPage = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->pgCache, handle);
memcpy(pFD->pBuf, pPage, pFD->szPage); memcpy(pFD->pBuf, pPage, pFD->szPage);
(void)tsdbCacheRelease(pFD->pTsdb->pgCache, handle); tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
// check // check
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) { if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {

View File

@ -99,8 +99,12 @@ _exit:
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino, tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code)); tstrerror(code));
} }
(void)taosCloseFile(&fdFrom); if (taosCloseFile(&fdFrom) != 0) {
(void)taosCloseFile(&fdTo); tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_from);
}
if (taosCloseFile(&fdTo) != 0) {
tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_to);
}
return code; return code;
} }
@ -136,8 +140,12 @@ _exit:
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino, tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code)); tstrerror(code));
} }
(void)taosCloseFile(&fdFrom); if (taosCloseFile(&fdFrom) != 0) {
(void)taosCloseFile(&fdTo); tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
if (taosCloseFile(&fdTo) != 0) {
tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
return code; return code;
} }
@ -441,7 +449,9 @@ _exit:
tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino, tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code)); tstrerror(code));
} }
(void)taosCloseFile(&fdFrom); if (taosCloseFile(&fdFrom) != 0) {
tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
return code; return code;
} }
@ -541,8 +551,13 @@ _exit:
tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino, tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code)); tstrerror(code));
} }
(void)taosCloseFile(&fdFrom); if (taosCloseFile(&fdFrom) != 0) {
(void)taosCloseFile(&fdTo); tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
if (taosCloseFile(&fdTo) != 0) {
tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
return code; return code;
} }
@ -639,8 +654,12 @@ _exit:
tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino, tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code)); tstrerror(code));
} }
(void)taosCloseFile(&fdFrom); if (taosCloseFile(&fdFrom) != 0) {
(void)taosCloseFile(&fdTo); tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
if (taosCloseFile(&fdTo) != 0) {
tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
return code; return code;
} }
@ -699,7 +718,9 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
if (taosCheckExistFile(fname1)) { if (taosCheckExistFile(fname1)) {
int32_t mtime = 0; int32_t mtime = 0;
int64_t size = 0; int64_t size = 0;
(void)taosStatFile(fname1, &size, &mtime, NULL); if (taosStatFile(fname1, &size, &mtime, NULL) != 0) {
tsdbError("vgId:%d, %s failed at %s:%d ", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, __LINE__);
}
if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) { if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
TAOS_CHECK_GOTO(tsdbMigrateDataFileLCS3(rtner, fobj, size, chunksize), &lino, _exit); TAOS_CHECK_GOTO(tsdbMigrateDataFileLCS3(rtner, fobj, size, chunksize), &lino, _exit);
} }

View File

@ -57,10 +57,10 @@ struct STsdbSnapReader {
STombBlock tombBlock[1]; STombBlock tombBlock[1];
}; };
static int32_t tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) { static void tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose); TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
tsdbDataFileReaderClose(&reader->dataReader); tsdbDataFileReaderClose(&reader->dataReader);
return 0; return;
} }
static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) { static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
@ -112,7 +112,7 @@ static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
_exit: _exit:
if (code) { if (code) {
TAOS_UNUSED(tsdbSnapReadFileSetCloseReader(reader)); tsdbSnapReadFileSetCloseReader(reader);
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
} }
return code; return code;
@ -190,12 +190,12 @@ _exit:
return code; return code;
} }
static int32_t tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) { static void tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) {
tsdbIterMergerClose(&reader->dataIterMerger); tsdbIterMergerClose(&reader->dataIterMerger);
tsdbIterMergerClose(&reader->tombIterMerger); tsdbIterMergerClose(&reader->tombIterMerger);
TARRAY2_CLEAR(reader->dataIterArr, tsdbIterClose); TARRAY2_CLEAR(reader->dataIterArr, tsdbIterClose);
TARRAY2_CLEAR(reader->tombIterArr, tsdbIterClose); TARRAY2_CLEAR(reader->tombIterArr, tsdbIterClose);
return 0; return;
} }
static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) { static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) {
@ -222,8 +222,8 @@ _exit:
} }
static int32_t tsdbSnapReadRangeEnd(STsdbSnapReader* reader) { static int32_t tsdbSnapReadRangeEnd(STsdbSnapReader* reader) {
TAOS_UNUSED(tsdbSnapReadFileSetCloseIter(reader)); tsdbSnapReadFileSetCloseIter(reader);
TAOS_UNUSED(tsdbSnapReadFileSetCloseReader(reader)); tsdbSnapReadFileSetCloseReader(reader);
reader->ctx->fsr = NULL; reader->ctx->fsr = NULL;
return 0; return 0;
} }
@ -373,7 +373,7 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) {
int32_t lino = 0; int32_t lino = 0;
SMetaInfo info; SMetaInfo info;
TAOS_UNUSED(tTombBlockClear(reader->tombBlock)); tTombBlockClear(reader->tombBlock);
TABLEID tbid[1] = {0}; TABLEID tbid[1] = {0};
for (STombRecord* record; (record = tsdbIterMergerGetTombRecord(reader->tombIterMerger)) != NULL;) { for (STombRecord* record; (record = tsdbIterMergerGetTombRecord(reader->tombIterMerger)) != NULL;) {
@ -463,7 +463,7 @@ void tsdbSnapReaderClose(STsdbSnapReader** reader) {
tDestroyTSchema(reader[0]->skmTb->pTSchema); tDestroyTSchema(reader[0]->skmTb->pTSchema);
for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->buffers); ++i) { for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->buffers); ++i) {
TAOS_UNUSED(tBufferDestroy(reader[0]->buffers + i)); tBufferDestroy(reader[0]->buffers + i);
} }
taosMemoryFree(reader[0]); taosMemoryFree(reader[0]);
@ -1000,7 +1000,7 @@ static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombB
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
TAOS_UNUSED(tTombBlockClear(tombBlock)); tTombBlockClear(tombBlock);
int64_t size = hdr->size; int64_t size = hdr->size;
size = size / TOMB_RECORD_ELEM_NUM; size = size / TOMB_RECORD_ELEM_NUM;

View File

@ -415,7 +415,7 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta
&lino, _exit); &lino, _exit);
// decode data // decode data
TAOS_UNUSED(tStatisBlockClear(statisBlock)); tStatisBlockClear(statisBlock);
statisBlock->numOfPKs = statisBlk->numOfPKs; statisBlock->numOfPKs = statisBlk->numOfPKs;
statisBlock->numOfRecords = statisBlk->numRec; statisBlock->numOfRecords = statisBlk->numRec;
SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0); SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
@ -654,7 +654,7 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
TAOS_CHECK_GOTO(TARRAY2_APPEND_PTR(writer->statisBlkArray, &statisBlk), &lino, _exit); TAOS_CHECK_GOTO(TARRAY2_APPEND_PTR(writer->statisBlkArray, &statisBlk), &lino, _exit);
TAOS_UNUSED(tStatisBlockClear(writer->staticBlock)); tStatisBlockClear(writer->staticBlock);
_exit: _exit:
if (code) { if (code) {

View File

@ -609,7 +609,10 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
SValue value; SValue value;
if (pRow->type == TSDBROW_ROW_FMT) { if (pRow->type == TSDBROW_ROW_FMT) {
(void)tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal); int32_t ret = tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal);
if (ret != 0) {
tsdbError("failed to get column value, code:%d", ret);
}
} else if (pRow->type == TSDBROW_COL_FMT) { } else if (pRow->type == TSDBROW_COL_FMT) {
if (iCol == 0) { if (iCol == 0) {
*pColVal = *pColVal =

View File

@ -93,25 +93,25 @@ void tStatisBlockDestroy(STbStatisBlock *statisBlock) {
statisBlock->numOfPKs = 0; statisBlock->numOfPKs = 0;
statisBlock->numOfRecords = 0; statisBlock->numOfRecords = 0;
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) { for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) {
TAOS_UNUSED(tBufferDestroy(&statisBlock->buffers[i])); tBufferDestroy(&statisBlock->buffers[i]);
} }
for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) { for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) {
TAOS_UNUSED(tValueColumnDestroy(&statisBlock->firstKeyPKs[i])); tValueColumnDestroy(&statisBlock->firstKeyPKs[i]);
TAOS_UNUSED(tValueColumnDestroy(&statisBlock->lastKeyPKs[i])); tValueColumnDestroy(&statisBlock->lastKeyPKs[i]);
} }
} }
int32_t tStatisBlockClear(STbStatisBlock *statisBlock) { void tStatisBlockClear(STbStatisBlock *statisBlock) {
statisBlock->numOfPKs = 0; statisBlock->numOfPKs = 0;
statisBlock->numOfRecords = 0; statisBlock->numOfRecords = 0;
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) { for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) {
TAOS_UNUSED(tBufferClear(&statisBlock->buffers[i])); tBufferClear(&statisBlock->buffers[i]);
} }
for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) { for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) {
tValueColumnClear(&statisBlock->firstKeyPKs[i]); tValueColumnClear(&statisBlock->firstKeyPKs[i]);
tValueColumnClear(&statisBlock->lastKeyPKs[i]); tValueColumnClear(&statisBlock->lastKeyPKs[i]);
} }
return 0; return;
} }
static int32_t tStatisBlockAppend(STbStatisBlock *block, SRowInfo *row) { static int32_t tStatisBlockAppend(STbStatisBlock *block, SRowInfo *row) {
@ -252,11 +252,11 @@ void tBrinBlockDestroy(SBrinBlock *brinBlock) {
brinBlock->numOfPKs = 0; brinBlock->numOfPKs = 0;
brinBlock->numOfRecords = 0; brinBlock->numOfRecords = 0;
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
TAOS_UNUSED(tBufferDestroy(&brinBlock->buffers[i])); tBufferDestroy(&brinBlock->buffers[i]);
} }
for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) { for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) {
TAOS_UNUSED(tValueColumnDestroy(&brinBlock->firstKeyPKs[i])); tValueColumnDestroy(&brinBlock->firstKeyPKs[i]);
TAOS_UNUSED(tValueColumnDestroy(&brinBlock->lastKeyPKs[i])); tValueColumnDestroy(&brinBlock->lastKeyPKs[i]);
} }
} }
@ -264,7 +264,7 @@ void tBrinBlockClear(SBrinBlock *brinBlock) {
brinBlock->numOfPKs = 0; brinBlock->numOfPKs = 0;
brinBlock->numOfRecords = 0; brinBlock->numOfRecords = 0;
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
TAOS_UNUSED(tBufferClear(&brinBlock->buffers[i])); tBufferClear(&brinBlock->buffers[i]);
} }
for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) { for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) {
tValueColumnClear(&brinBlock->firstKeyPKs[i]); tValueColumnClear(&brinBlock->firstKeyPKs[i]);

View File

@ -113,7 +113,7 @@ typedef struct {
int32_t tStatisBlockInit(STbStatisBlock *statisBlock); int32_t tStatisBlockInit(STbStatisBlock *statisBlock);
void tStatisBlockDestroy(STbStatisBlock *statisBlock); void tStatisBlockDestroy(STbStatisBlock *statisBlock);
int32_t tStatisBlockClear(STbStatisBlock *statisBlock); void tStatisBlockClear(STbStatisBlock *statisBlock);
int32_t tStatisBlockPut(STbStatisBlock *statisBlock, SRowInfo *row, int32_t maxRecords); int32_t tStatisBlockPut(STbStatisBlock *statisBlock, SRowInfo *row, int32_t maxRecords);
int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record); int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record);

View File

@ -187,10 +187,12 @@ static void vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
task->prev->next = task->next; task->prev->next = task->next;
task->next->prev = task->prev; task->next->prev = task->prev;
if (task->cancel) { if (task->cancel) {
TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){ if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
.cancel = task->cancel, .cancel = task->cancel,
.arg = task->arg, .arg = task->arg,
})); }) == NULL) {
vError("failed to push cancel task into array");
};
} }
vnodeAsyncTaskDone(async, task); vnodeAsyncTaskDone(async, task);
} }
@ -430,7 +432,7 @@ static void vnodeAsyncLaunchWorker(SVAsync *async) {
if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) { if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) {
continue; continue;
} else if (async->workers[i].state == EVA_WORKER_STATE_STOP) { } else if (async->workers[i].state == EVA_WORKER_STATE_STOP) {
TAOS_UNUSED(taosThreadJoin(async->workers[i].thread, NULL)); int32_t ret = taosThreadJoin(async->workers[i].thread, NULL);
async->workers[i].state = EVA_WORKER_STATE_UINIT; async->workers[i].state = EVA_WORKER_STATE_UINIT;
} }
@ -748,10 +750,12 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
task->prev->next = task->next; task->prev->next = task->next;
task->next->prev = task->prev; task->next->prev = task->prev;
if (task->cancel) { if (task->cancel) {
TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){ if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
.cancel = task->cancel, .cancel = task->cancel,
.arg = task->arg, .arg = task->arg,
})); }) == NULL) {
vError("failed to push cancel info");
};
} }
vnodeAsyncTaskDone(async, task); vnodeAsyncTaskDone(async, task);
} }
@ -763,10 +767,12 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
channel->scheduled->prev->next = channel->scheduled->next; channel->scheduled->prev->next = channel->scheduled->next;
channel->scheduled->next->prev = channel->scheduled->prev; channel->scheduled->next->prev = channel->scheduled->prev;
if (channel->scheduled->cancel) { if (channel->scheduled->cancel) {
TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){ if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
.cancel = channel->scheduled->cancel, .cancel = channel->scheduled->cancel,
.arg = channel->scheduled->arg, .arg = channel->scheduled->arg,
})); }) == NULL) {
vError("failed to push cancel info");
}
} }
vnodeAsyncTaskDone(async, channel->scheduled); vnodeAsyncTaskDone(async, channel->scheduled);
} }

View File

@ -201,7 +201,9 @@ _exit:
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname, vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname,
pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion); pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion);
} }
(void)taosCloseFile(&pFile); if (taosCloseFile(&pFile) != 0) {
vError("vgId:%d, failed to close file", pInfo->config.vgId);
}
taosMemoryFree(data); taosMemoryFree(data);
return code; return code;
} }
@ -263,7 +265,9 @@ _exit:
} }
} }
taosMemoryFree(pData); taosMemoryFree(pData);
(void)taosCloseFile(&pFile); if (taosCloseFile(&pFile) != 0) {
vError("vgId:%d, failed to close file", pInfo->config.vgId);
}
return code; return code;
} }
@ -496,7 +500,9 @@ void vnodeRollback(SVnode *pVnode) {
offset = strlen(tFName); offset = strlen(tFName);
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP); snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
TAOS_UNUSED(taosRemoveFile(tFName)); if (taosRemoveFile(tFName) != 0) {
vError("vgId:%d, failed to remove file %s since %s", TD_VID(pVnode), tFName, tstrerror(terrno));
}
} }
static int vnodeEncodeState(const void *pObj, SJson *pJson) { static int vnodeEncodeState(const void *pObj, SJson *pJson) {

View File

@ -414,7 +414,10 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
pVnode->blocked = false; pVnode->blocked = false;
pVnode->disableWrite = false; pVnode->disableWrite = false;
(void)tsem_init(&pVnode->syncSem, 0, 0); if (tsem_init(&pVnode->syncSem, 0, 0) != 0) {
vError("vgId:%d, failed to init semaphore", TD_VID(pVnode));
goto _err;
}
(void)taosThreadMutexInit(&pVnode->mutex, NULL); (void)taosThreadMutexInit(&pVnode->mutex, NULL);
(void)taosThreadCondInit(&pVnode->poolNotEmpty, NULL); (void)taosThreadCondInit(&pVnode->poolNotEmpty, NULL);

View File

@ -274,13 +274,17 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
int64_t size; int64_t size;
code = taosFStatFile(pFile, &size, NULL); code = taosFStatFile(pFile, &size, NULL);
if (code != 0) { if (code != 0) {
(void)taosCloseFile(&pFile); if (taosCloseFile(&pFile) != 0) {
vError("vgId:%d, failed to close file", vgId);
}
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1); *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1);
if (*ppData == NULL) { if (*ppData == NULL) {
(void)taosCloseFile(&pFile); if (taosCloseFile(&pFile) != 0) {
vError("vgId:%d, failed to close file", vgId);
}
TSDB_CHECK_CODE(code = terrno, lino, _exit); TSDB_CHECK_CODE(code = terrno, lino, _exit);
} }
((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG; ((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG;
@ -289,11 +293,15 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) { if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) {
taosMemoryFree(*ppData); taosMemoryFree(*ppData);
(void)taosCloseFile(&pFile); if (taosCloseFile(&pFile) != 0) {
vError("vgId:%d, failed to close file", vgId);
}
TSDB_CHECK_CODE(code = terrno, lino, _exit); TSDB_CHECK_CODE(code = terrno, lino, _exit);
} }
(void)taosCloseFile(&pFile); if (taosCloseFile(&pFile) != 0) {
vError("vgId:%d, failed to close file", vgId);
}
pReader->cfgDone = 1; pReader->cfgDone = 1;
goto _exit; goto _exit;

View File

@ -1833,7 +1833,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
} }
if (info.suid) { if (info.suid) {
(void)metaGetInfo(pVnode->pMeta, info.suid, &info, NULL); if (metaGetInfo(pVnode->pMeta, info.suid, &info, NULL) != 0) {
vWarn("vgId:%d, table uid:%" PRId64 " not exists", TD_VID(pVnode), info.suid);
}
} }
if (pSubmitTbData->sver != info.skmVer) { if (pSubmitTbData->sver != info.skmVer) {

View File

@ -28,7 +28,9 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg, vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq); TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
(void)tsem_wait(&pVnode->syncSem); if (tsem_wait(&pVnode->syncSem) != 0) {
vError("vgId:%d, failed to wait sem", pVnode->config.vgId);
}
} }
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
@ -41,7 +43,9 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
pVnode->blocked = false; pVnode->blocked = false;
pVnode->blockSec = 0; pVnode->blockSec = 0;
pVnode->blockSeq = 0; pVnode->blockSeq = 0;
(void)tsem_post(&pVnode->syncSem); if (tsem_post(&pVnode->syncSem) != 0) {
vError("vgId:%d, failed to post sem", pVnode->config.vgId);
}
} }
(void)taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
} }
@ -613,7 +617,9 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
if (pVnode->blocked) { if (pVnode->blocked) {
pVnode->blocked = false; pVnode->blocked = false;
vDebug("vgId:%d, become follower and post block", pVnode->config.vgId); vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
(void)tsem_post(&pVnode->syncSem); if (tsem_post(&pVnode->syncSem) != 0) {
vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
}
} }
(void)taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
@ -633,7 +639,9 @@ static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
if (pVnode->blocked) { if (pVnode->blocked) {
pVnode->blocked = false; pVnode->blocked = false;
vDebug("vgId:%d, become learner and post block", pVnode->config.vgId); vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
(void)tsem_post(&pVnode->syncSem); if (tsem_post(&pVnode->syncSem) != 0) {
vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
}
} }
(void)taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
} }
@ -766,7 +774,9 @@ void vnodeSyncPreClose(SVnode *pVnode) {
if (pVnode->blocked) { if (pVnode->blocked) {
vInfo("vgId:%d, post block after close sync", pVnode->config.vgId); vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
pVnode->blocked = false; pVnode->blocked = false;
(void)tsem_post(&pVnode->syncSem); if (tsem_post(&pVnode->syncSem) != 0) {
vError("vgId:%d, failed to post block", pVnode->config.vgId);
}
} }
(void)taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
} }
@ -801,7 +811,9 @@ void vnodeSyncCheckTimeout(SVnode *pVnode) {
pVnode->blocked = false; pVnode->blocked = false;
pVnode->blockSec = 0; pVnode->blockSec = 0;
pVnode->blockSeq = 0; pVnode->blockSeq = 0;
(void)tsem_post(&pVnode->syncSem); if (tsem_post(&pVnode->syncSem) != 0) {
vError("vgId:%d, failed to post block", pVnode->config.vgId);
}
} }
} }
(void)taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);

View File

@ -232,7 +232,7 @@ int32_t tsem_init(tsem_t *psem, int flags, unsigned int count) {
if(sem_init(psem, flags, count) == 0) { if(sem_init(psem, flags, count) == 0) {
return 0; return 0;
} else { } else {
return TAOS_SYSTEM_ERROR(errno); return terrno = TAOS_SYSTEM_ERROR(errno);
} }
} }

View File

@ -224,7 +224,9 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
(void)taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
if (queue->qset) { if (queue->qset) {
(void)tsem_post(&queue->qset->sem); if (tsem_post(&queue->qset->sem) != 0) {
uError("failed to post semaphore for queue set:%p", queue->qset);
}
} }
return code; return code;
} }
@ -333,7 +335,10 @@ int32_t taosOpenQset(STaosQset **qset) {
} }
(void)taosThreadMutexInit(&(*qset)->mutex, NULL); (void)taosThreadMutexInit(&(*qset)->mutex, NULL);
(void)tsem_init(&(*qset)->sem, 0, 0); if (tsem_init(&(*qset)->sem, 0, 0) != 0) {
taosMemoryFree(*qset);
return terrno;
}
uDebug("qset:%p is opened", qset); uDebug("qset:%p is opened", qset);
return 0; return 0;
@ -354,7 +359,9 @@ void taosCloseQset(STaosQset *qset) {
(void)taosThreadMutexUnlock(&qset->mutex); (void)taosThreadMutexUnlock(&qset->mutex);
(void)taosThreadMutexDestroy(&qset->mutex); (void)taosThreadMutexDestroy(&qset->mutex);
(void)tsem_destroy(&qset->sem); if (tsem_destroy(&qset->sem) != 0) {
uError("failed to destroy semaphore for qset:%p", qset);
}
taosMemoryFree(qset); taosMemoryFree(qset);
uDebug("qset:%p is closed", qset); uDebug("qset:%p is closed", qset);
} }
@ -364,7 +371,9 @@ void taosCloseQset(STaosQset *qset) {
// thread to exit. // thread to exit.
void taosQsetThreadResume(STaosQset *qset) { void taosQsetThreadResume(STaosQset *qset) {
uDebug("qset:%p, it will exit", qset); uDebug("qset:%p, it will exit", qset);
(void)tsem_post(&qset->sem); if (tsem_post(&qset->sem) != 0) {
uError("failed to post semaphore for qset:%p", qset);
}
} }
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) { int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
@ -432,7 +441,9 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo)
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int32_t code = 0; int32_t code = 0;
(void)tsem_wait(&qset->sem); if (tsem_wait(&qset->sem) != 0) {
uError("failed to wait semaphore for qset:%p", qset);
}
(void)taosThreadMutexLock(&qset->mutex); (void)taosThreadMutexLock(&qset->mutex);
@ -476,7 +487,9 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
STaosQueue *queue; STaosQueue *queue;
int32_t code = 0; int32_t code = 0;
(void)tsem_wait(&qset->sem); if (tsem_wait(&qset->sem) != 0) {
uError("failed to wait semaphore for qset:%p", qset);
}
(void)taosThreadMutexLock(&qset->mutex); (void)taosThreadMutexLock(&qset->mutex);
for (int32_t i = 0; i < qset->numOfQueues; ++i) { for (int32_t i = 0; i < qset->numOfQueues; ++i) {
@ -510,7 +523,9 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
(void)atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); (void)atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
for (int32_t j = 1; j < qall->numOfItems; ++j) { for (int32_t j = 1; j < qall->numOfItems; ++j) {
(void)tsem_wait(&qset->sem); if (tsem_wait(&qset->sem) != 0) {
uError("failed to wait semaphore for qset:%p", qset);
}
} }
} }