enh: refactor return code

This commit is contained in:
Hongze Cheng 2024-07-27 18:28:26 +08:00
parent 3727739721
commit ded23d4b08
24 changed files with 324 additions and 312 deletions

View File

@ -260,7 +260,7 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
return code; return code;
} }
taosThreadMutexLock(&pDnode->mutex); (void)taosThreadMutexLock(&pDnode->mutex);
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
dInfo("node:%s, start to create", pWrapper->name); dInfo("node:%s, start to create", pWrapper->name);
@ -277,7 +277,7 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
pWrapper->required = true; pWrapper->required = true;
} }
taosThreadMutexUnlock(&pDnode->mutex); (void)taosThreadMutexUnlock(&pDnode->mutex);
return code; return code;
} }
@ -316,7 +316,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pWrapper->name); dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pWrapper->name);
taosThreadMutexLock(&pDnode->mutex); (void)taosThreadMutexLock(&pDnode->mutex);
dInfo("node:%s, stopping node", pWrapper->name); dInfo("node:%s, stopping node", pWrapper->name);
dmStopNode(pWrapper); dmStopNode(pWrapper);
@ -325,7 +325,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
pWrapper = &pDnode->wrappers[ntype]; pWrapper = &pDnode->wrappers[ntype];
if (taosMkDir(pWrapper->path) != 0) { if (taosMkDir(pWrapper->path) != 0) {
taosThreadMutexUnlock(&pDnode->mutex); (void)taosThreadMutexUnlock(&pDnode->mutex);
code = terrno; code = terrno;
dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code)); dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
return code; return code;
@ -347,7 +347,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
pWrapper->required = true; pWrapper->required = true;
} }
taosThreadMutexUnlock(&pDnode->mutex); (void)taosThreadMutexUnlock(&pDnode->mutex);
return code; return code;
} }
@ -375,7 +375,7 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
return terrno = code; return terrno = code;
} }
taosThreadMutexLock(&pDnode->mutex); (void)taosThreadMutexLock(&pDnode->mutex);
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
dInfo("node:%s, start to drop", pWrapper->name); dInfo("node:%s, start to drop", pWrapper->name);
@ -395,7 +395,7 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
dmCloseNode(pWrapper); dmCloseNode(pWrapper);
taosRemoveDir(pWrapper->path); taosRemoveDir(pWrapper->path);
} }
taosThreadMutexUnlock(&pDnode->mutex); (void)taosThreadMutexUnlock(&pDnode->mutex);
return code; return code;
} }

View File

@ -530,12 +530,12 @@ int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pK
uint64_t key[4]; uint64_t key[4];
initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen); initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
taosThreadMutexLock(pLock); (void)taosThreadMutexLock(pLock);
pMeta->pCache->sTagFilterResCache.accTimes += 1; pMeta->pCache->sTagFilterResCache.accTimes += 1;
LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN); LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
if (pHandle == NULL) { if (pHandle == NULL) {
taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -565,7 +565,7 @@ int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pK
taosLRUCacheRelease(pCache, pHandle, false); taosLRUCacheRelease(pCache, pHandle, false);
// unlock meta // unlock meta
taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -652,7 +652,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
uint64_t key[4] = {0}; uint64_t key[4] = {0};
initCacheKey(key, pTableEntry, suid, pKey, keyLen); initCacheKey(key, pTableEntry, suid, pKey, keyLen);
taosThreadMutexLock(pLock); (void)taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t)); STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
if (pEntry == NULL) { if (pEntry == NULL) {
code = addNewEntry(pTableEntry, pKey, keyLen, suid); code = addNewEntry(pTableEntry, pKey, keyLen, suid);
@ -668,7 +668,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
uint64_t* p = (uint64_t*)pNode->data; uint64_t* p = (uint64_t*)pNode->data;
if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) { if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
// we have already found the existed items, no need to added to cache anymore. // we have already found the existed items, no need to added to cache anymore.
taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // not equal, append it } else { // not equal, append it
tdListAppend(&(*pEntry)->list, pKey); tdListAppend(&(*pEntry)->list, pKey);
@ -680,7 +680,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL, (void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL,
TAOS_LRU_PRIORITY_LOW, NULL); TAOS_LRU_PRIORITY_LOW, NULL);
_end: _end:
taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid, metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry)); (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
@ -697,11 +697,11 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16); initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock; TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
taosThreadMutexLock(pLock); (void)taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t)); STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) { if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -717,7 +717,7 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
} }
tdListEmpty(&(*pEntry)->list); tdListEmpty(&(*pEntry)->list);
taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid); metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -736,12 +736,12 @@ int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, i
uint64_t key[4]; uint64_t key[4];
initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen); initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
taosThreadMutexLock(pLock); (void)taosThreadMutexLock(pLock);
pMeta->pCache->STbGroupResCache.accTimes += 1; pMeta->pCache->STbGroupResCache.accTimes += 1;
LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN); LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
if (pHandle == NULL) { if (pHandle == NULL) {
taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -764,7 +764,7 @@ int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, i
taosLRUCacheRelease(pCache, pHandle, false); taosLRUCacheRelease(pCache, pHandle, false);
// unlock meta // unlock meta
taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -829,7 +829,7 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
uint64_t key[4] = {0}; uint64_t key[4] = {0};
initCacheKey(key, pTableEntry, suid, pKey, keyLen); initCacheKey(key, pTableEntry, suid, pKey, keyLen);
taosThreadMutexLock(pLock); (void)taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t)); STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
if (pEntry == NULL) { if (pEntry == NULL) {
code = addNewEntry(pTableEntry, pKey, keyLen, suid); code = addNewEntry(pTableEntry, pKey, keyLen, suid);
@ -845,7 +845,7 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
uint64_t* p = (uint64_t*)pNode->data; uint64_t* p = (uint64_t*)pNode->data;
if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) { if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
// we have already found the existed items, no need to added to cache anymore. // we have already found the existed items, no need to added to cache anymore.
taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // not equal, append it } else { // not equal, append it
tdListAppend(&(*pEntry)->list, pKey); tdListAppend(&(*pEntry)->list, pKey);
@ -857,7 +857,7 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL, taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL,
TAOS_LRU_PRIORITY_LOW, NULL); TAOS_LRU_PRIORITY_LOW, NULL);
_end: _end:
taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid, metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry)); (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
@ -874,11 +874,11 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16); initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock; TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
taosThreadMutexLock(pLock); (void)taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t)); STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) { if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -894,7 +894,7 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
} }
tdListEmpty(&(*pEntry)->list); tdListEmpty(&(*pEntry)->list);
taosThreadMutexUnlock(pLock); (void)taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid); metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -540,7 +540,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
} }
} }
taosThreadMutexLock(&tsdb->mutex); (void)taosThreadMutexLock(&tsdb->mutex);
// scan tomb data // scan tomb data
if (tsdb->imem->nDel > 0) { if (tsdb->imem->nDel > 0) {
@ -572,7 +572,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
} else { } else {
hasDataToCommit = true; hasDataToCommit = true;
if ((code = tsdbCommitInfoAdd(tsdb, fset->fid))) { if ((code = tsdbCommitInfoAdd(tsdb, fset->fid))) {
taosThreadMutexUnlock(&tsdb->mutex); (void)taosThreadMutexUnlock(&tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
break; break;
@ -593,13 +593,13 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
if (fset) { if (fset) {
code = tsdbTFileSetInitCopy(tsdb, fset, &info->fset); code = tsdbTFileSetInitCopy(tsdb, fset, &info->fset);
if (code) { if (code) {
taosThreadMutexUnlock(&tsdb->mutex); (void)taosThreadMutexUnlock(&tsdb->mutex);
TAOS_CHECK_GOTO(code, &lino, _exit); TAOS_CHECK_GOTO(code, &lino, _exit);
} }
} }
} }
taosThreadMutexUnlock(&tsdb->mutex); (void)taosThreadMutexUnlock(&tsdb->mutex);
_exit: _exit:
if (code) { if (code) {
@ -667,11 +667,11 @@ _exit:
} }
int32_t tsdbPreCommit(STsdb *tsdb) { int32_t tsdbPreCommit(STsdb *tsdb) {
taosThreadMutexLock(&tsdb->mutex); (void)taosThreadMutexLock(&tsdb->mutex);
ASSERT_CORE(tsdb->imem == NULL, "imem should be null to commit mem"); ASSERT_CORE(tsdb->imem == NULL, "imem should be null to commit mem");
tsdb->imem = tsdb->mem; tsdb->imem = tsdb->mem;
tsdb->mem = NULL; tsdb->mem = NULL;
taosThreadMutexUnlock(&tsdb->mutex); (void)taosThreadMutexUnlock(&tsdb->mutex);
return 0; return 0;
} }
@ -686,9 +686,9 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
int64_t nDel = imem->nDel; int64_t nDel = imem->nDel;
if (nRow == 0 && nDel == 0) { if (nRow == 0 && nDel == 0) {
taosThreadMutexLock(&tsdb->mutex); (void)taosThreadMutexLock(&tsdb->mutex);
tsdb->imem = NULL; tsdb->imem = NULL;
taosThreadMutexUnlock(&tsdb->mutex); (void)taosThreadMutexUnlock(&tsdb->mutex);
tsdbUnrefMemTable(imem, NULL, true); tsdbUnrefMemTable(imem, NULL, true);
} else { } else {
SCommitter2 committer = {0}; SCommitter2 committer = {0};
@ -719,10 +719,10 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
if (tsdb->imem) { if (tsdb->imem) {
SMemTable *pMemTable = tsdb->imem; SMemTable *pMemTable = tsdb->imem;
taosThreadMutexLock(&tsdb->mutex); (void)taosThreadMutexLock(&tsdb->mutex);
if ((code = tsdbFSEditCommit(tsdb->pFS))) { if ((code = tsdbFSEditCommit(tsdb->pFS))) {
taosThreadMutexUnlock(&tsdb->mutex); (void)taosThreadMutexUnlock(&tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
tsdb->imem = NULL; tsdb->imem = NULL;
@ -734,7 +734,7 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
} }
} }
taosThreadMutexUnlock(&tsdb->mutex); (void)taosThreadMutexUnlock(&tsdb->mutex);
tsdbCommitInfoDestroy(tsdb); tsdbCommitInfoDestroy(tsdb);
tsdbUnrefMemTable(pMemTable, NULL, true); tsdbUnrefMemTable(pMemTable, NULL, true);
@ -757,14 +757,14 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) {
TAOS_CHECK_GOTO(tsdbFSEditAbort(pTsdb->pFS), &lino, _exit); TAOS_CHECK_GOTO(tsdbFSEditAbort(pTsdb->pFS), &lino, _exit);
taosThreadMutexLock(&pTsdb->mutex); (void)taosThreadMutexLock(&pTsdb->mutex);
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) { for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i); SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i);
if (info->fset) { if (info->fset) {
tsdbFinishTaskOnFileSet(pTsdb, info->fid); tsdbFinishTaskOnFileSet(pTsdb, info->fid);
} }
} }
taosThreadMutexUnlock(&pTsdb->mutex); (void)taosThreadMutexUnlock(&pTsdb->mutex);
tsdbCommitInfoDestroy(pTsdb); tsdbCommitInfoDestroy(pTsdb);
_exit: _exit:

View File

@ -761,7 +761,7 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosThreadMutexLock(&pTsdb->mutex); (void)taosThreadMutexLock(&pTsdb->mutex);
// disable // disable
pTsdb->bgTaskDisabled = true; pTsdb->bgTaskDisabled = true;
@ -770,7 +770,11 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
STFileSet *fset; STFileSet *fset;
TARRAY2_FOREACH(fs->fSetArr, fset) { TARRAY2_FOREACH(fs->fSetArr, fset) {
if (fset->channelOpened) { if (fset->channelOpened) {
taosArrayPush(channelArray, &fset->channel); if (taosArrayPush(channelArray, &fset->channel) == NULL) {
taosArrayDestroy(channelArray);
(void)taosThreadMutexUnlock(&pTsdb->mutex);
return terrno;
}
fset->channel = (SVAChannelID){0}; fset->channel = (SVAChannelID){0};
fset->mergeScheduled = false; fset->mergeScheduled = false;
tsdbFSSetBlockCommit(fset, false); tsdbFSSetBlockCommit(fset, false);
@ -778,7 +782,7 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
} }
} }
taosThreadMutexUnlock(&pTsdb->mutex); (void)taosThreadMutexUnlock(&pTsdb->mutex);
// destroy all channels // destroy all channels
for (int32_t i = 0; i < taosArrayGetSize(channelArray); i++) { for (int32_t i = 0; i < taosArrayGetSize(channelArray); i++) {
@ -794,9 +798,9 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
} }
int32_t tsdbEnableBgTask(STsdb *pTsdb) { int32_t tsdbEnableBgTask(STsdb *pTsdb) {
taosThreadMutexLock(&pTsdb->mutex); (void)taosThreadMutexLock(&pTsdb->mutex);
pTsdb->bgTaskDisabled = false; pTsdb->bgTaskDisabled = false;
taosThreadMutexUnlock(&pTsdb->mutex); (void)taosThreadMutexUnlock(&pTsdb->mutex);
return 0; return 0;
} }
@ -810,16 +814,16 @@ int32_t tsdbCloseFS(STFileSystem **fs) {
} }
int64_t tsdbFSAllocEid(STFileSystem *fs) { int64_t tsdbFSAllocEid(STFileSystem *fs) {
taosThreadMutexLock(&fs->tsdb->mutex); (void)taosThreadMutexLock(&fs->tsdb->mutex);
int64_t cid = ++fs->neid; int64_t cid = ++fs->neid;
taosThreadMutexUnlock(&fs->tsdb->mutex); (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
return cid; return cid;
} }
void tsdbFSUpdateEid(STFileSystem *fs, int64_t cid) { void tsdbFSUpdateEid(STFileSystem *fs, int64_t cid) {
taosThreadMutexLock(&fs->tsdb->mutex); (void)taosThreadMutexLock(&fs->tsdb->mutex);
fs->neid = TMAX(fs->neid, cid); fs->neid = TMAX(fs->neid, cid);
taosThreadMutexUnlock(&fs->tsdb->mutex); (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
} }
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) { int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
@ -867,7 +871,7 @@ static int32_t tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
} }
int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) { int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
taosThreadMutexLock(&tsdb->mutex); (void)taosThreadMutexLock(&tsdb->mutex);
STFileSet *fset; STFileSet *fset;
tsdbFSGetFSet(tsdb->pFS, fid, &fset); tsdbFSGetFSet(tsdb->pFS, fid, &fset);
if (fset) { if (fset) {
@ -877,7 +881,7 @@ int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
fset->numWaitCommit--; fset->numWaitCommit--;
} }
} }
taosThreadMutexUnlock(&tsdb->mutex); (void)taosThreadMutexUnlock(&tsdb->mutex);
return 0; return 0;
} }
@ -969,7 +973,7 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
TARRAY2_INIT(fsetArr[0]); TARRAY2_INIT(fsetArr[0]);
taosThreadMutexLock(&fs->tsdb->mutex); (void)taosThreadMutexLock(&fs->tsdb->mutex);
TARRAY2_FOREACH(fs->fSetArr, fset) { TARRAY2_FOREACH(fs->fSetArr, fset) {
code = tsdbTFileSetInitCopy(fs->tsdb, fset, &fset1); code = tsdbTFileSetInitCopy(fs->tsdb, fset, &fset1);
if (code) break; if (code) break;
@ -977,7 +981,7 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
code = TARRAY2_APPEND(fsetArr[0], fset1); code = TARRAY2_APPEND(fsetArr[0], fset1);
if (code) break; if (code) break;
} }
taosThreadMutexUnlock(&fs->tsdb->mutex); (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
if (code) { if (code) {
TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear); TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
@ -997,9 +1001,9 @@ int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
} }
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) { int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
taosThreadMutexLock(&fs->tsdb->mutex); (void)taosThreadMutexLock(&fs->tsdb->mutex);
int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr); int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr);
taosThreadMutexUnlock(&fs->tsdb->mutex); (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
return code; return code;
} }
@ -1071,7 +1075,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRa
} }
} }
taosThreadMutexLock(&fs->tsdb->mutex); (void)taosThreadMutexLock(&fs->tsdb->mutex);
TARRAY2_FOREACH(fs->fSetArr, fset) { TARRAY2_FOREACH(fs->fSetArr, fset) {
int64_t ever = VERSION_MAX; int64_t ever = VERSION_MAX;
if (pHash) { if (pHash) {
@ -1088,7 +1092,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRa
code = TARRAY2_APPEND(fsetArr[0], fset1); code = TARRAY2_APPEND(fsetArr[0], fset1);
if (code) break; if (code) break;
} }
taosThreadMutexUnlock(&fs->tsdb->mutex); (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
_out: _out:
if (code) { if (code) {
@ -1127,7 +1131,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
} }
} }
taosThreadMutexLock(&fs->tsdb->mutex); (void)taosThreadMutexLock(&fs->tsdb->mutex);
TARRAY2_FOREACH(fs->fSetArr, fset) { TARRAY2_FOREACH(fs->fSetArr, fset) {
int64_t sver1 = sver; int64_t sver1 = sver;
int64_t ever1 = ever; int64_t ever1 = ever;
@ -1156,7 +1160,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
fsr1 = NULL; fsr1 = NULL;
} }
taosThreadMutexUnlock(&fs->tsdb->mutex); (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
if (code) { if (code) {
tsdbTFileSetRangeClear(&fsr1); tsdbTFileSetRangeClear(&fsr1);

View File

@ -230,11 +230,11 @@ int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj) {
return terrno; return terrno;
} }
taosThreadMutexInit(&fobj[0]->mutex, NULL); (void)taosThreadMutexInit(&fobj[0]->mutex, NULL);
fobj[0]->f[0] = f[0]; fobj[0]->f[0] = f[0];
fobj[0]->state = TSDB_FSTATE_LIVE; fobj[0]->state = TSDB_FSTATE_LIVE;
fobj[0]->ref = 1; fobj[0]->ref = 1;
tsdbTFileName(pTsdb, f, fobj[0]->fname); (void)tsdbTFileName(pTsdb, f, fobj[0]->fname);
// fobj[0]->nlevel = tfsGetLevel(pTsdb->pVnode->pTfs); // fobj[0]->nlevel = tfsGetLevel(pTsdb->pVnode->pTfs);
fobj[0]->nlevel = vnodeNodeId(pTsdb->pVnode); fobj[0]->nlevel = vnodeNodeId(pTsdb->pVnode);
return 0; return 0;
@ -242,18 +242,18 @@ int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj) {
int32_t tsdbTFileObjRef(STFileObj *fobj) { int32_t tsdbTFileObjRef(STFileObj *fobj) {
int32_t nRef; int32_t nRef;
taosThreadMutexLock(&fobj->mutex); (void)(void)taosThreadMutexLock(&fobj->mutex);
ASSERT(fobj->ref > 0 && fobj->state == TSDB_FSTATE_LIVE); ASSERT(fobj->ref > 0 && fobj->state == TSDB_FSTATE_LIVE);
nRef = ++fobj->ref; nRef = ++fobj->ref;
taosThreadMutexUnlock(&fobj->mutex); (void)(void)taosThreadMutexUnlock(&fobj->mutex);
tsdbTrace("ref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef); tsdbTrace("ref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
return 0; return 0;
} }
int32_t tsdbTFileObjUnref(STFileObj *fobj) { int32_t tsdbTFileObjUnref(STFileObj *fobj) {
taosThreadMutexLock(&fobj->mutex); (void)(void)taosThreadMutexLock(&fobj->mutex);
int32_t nRef = --fobj->ref; int32_t nRef = --fobj->ref;
taosThreadMutexUnlock(&fobj->mutex); (void)(void)taosThreadMutexUnlock(&fobj->mutex);
ASSERT(nRef >= 0); ASSERT(nRef >= 0);
tsdbTrace("unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef); tsdbTrace("unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
if (nRef == 0) { if (nRef == 0) {
@ -318,11 +318,11 @@ static void tsdbTFileObjRemoveLC(STFileObj *fobj, bool remove_all) {
} }
int32_t tsdbTFileObjRemove(STFileObj *fobj) { int32_t tsdbTFileObjRemove(STFileObj *fobj) {
taosThreadMutexLock(&fobj->mutex); (void)taosThreadMutexLock(&fobj->mutex);
ASSERT(fobj->state == TSDB_FSTATE_LIVE && fobj->ref > 0); ASSERT(fobj->state == TSDB_FSTATE_LIVE && fobj->ref > 0);
fobj->state = TSDB_FSTATE_DEAD; fobj->state = TSDB_FSTATE_DEAD;
int32_t nRef = --fobj->ref; int32_t nRef = --fobj->ref;
taosThreadMutexUnlock(&fobj->mutex); (void)taosThreadMutexUnlock(&fobj->mutex);
tsdbTrace("remove unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef); tsdbTrace("remove unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
if (nRef == 0) { if (nRef == 0) {
tsdbTFileObjRemoveLC(fobj, true); tsdbTFileObjRemoveLC(fobj, true);
@ -332,11 +332,11 @@ int32_t tsdbTFileObjRemove(STFileObj *fobj) {
} }
int32_t tsdbTFileObjRemoveUpdateLC(STFileObj *fobj) { int32_t tsdbTFileObjRemoveUpdateLC(STFileObj *fobj) {
taosThreadMutexLock(&fobj->mutex); (void)taosThreadMutexLock(&fobj->mutex);
ASSERT(fobj->state == TSDB_FSTATE_LIVE && fobj->ref > 0); ASSERT(fobj->state == TSDB_FSTATE_LIVE && fobj->ref > 0);
fobj->state = TSDB_FSTATE_DEAD; fobj->state = TSDB_FSTATE_DEAD;
int32_t nRef = --fobj->ref; int32_t nRef = --fobj->ref;
taosThreadMutexUnlock(&fobj->mutex); (void)taosThreadMutexUnlock(&fobj->mutex);
tsdbTrace("remove unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef); tsdbTrace("remove unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
if (nRef == 0) { if (nRef == 0) {
tsdbTFileObjRemoveLC(fobj, false); tsdbTFileObjRemoveLC(fobj, false);

View File

@ -375,13 +375,13 @@ static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
// edit file system // edit file system
TAOS_CHECK_GOTO(tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE), &lino, _exit); TAOS_CHECK_GOTO(tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE), &lino, _exit);
taosThreadMutexLock(&merger->tsdb->mutex); (void)taosThreadMutexLock(&merger->tsdb->mutex);
code = tsdbFSEditCommit(merger->tsdb->pFS); code = tsdbFSEditCommit(merger->tsdb->pFS);
if (code) { if (code) {
taosThreadMutexUnlock(&merger->tsdb->mutex); (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
taosThreadMutexUnlock(&merger->tsdb->mutex); (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
_exit: _exit:
if (code) { if (code) {
@ -478,10 +478,10 @@ _exit:
static int32_t tsdbMergeGetFSet(SMerger *merger) { static int32_t tsdbMergeGetFSet(SMerger *merger) {
STFileSet *fset; STFileSet *fset;
taosThreadMutexLock(&merger->tsdb->mutex); (void)taosThreadMutexLock(&merger->tsdb->mutex);
tsdbFSGetFSet(merger->tsdb->pFS, merger->fid, &fset); tsdbFSGetFSet(merger->tsdb->pFS, merger->fid, &fset);
if (fset == NULL) { if (fset == NULL) {
taosThreadMutexUnlock(&merger->tsdb->mutex); (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
return 0; return 0;
} }
@ -489,10 +489,10 @@ static int32_t tsdbMergeGetFSet(SMerger *merger) {
int32_t code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset); int32_t code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset);
if (code) { if (code) {
taosThreadMutexUnlock(&merger->tsdb->mutex); (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
return code; return code;
} }
taosThreadMutexUnlock(&merger->tsdb->mutex); (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
return 0; return 0;
} }

View File

@ -105,10 +105,10 @@ int32_t tsdbClose(STsdb **pTsdb) {
tsdbDebug("vgId:%d, tsdb is close at %s, days:%d, keep:%d,%d,%d, keepTimeOffset:%d", TD_VID(pdb->pVnode), pdb->path, tsdbDebug("vgId:%d, tsdb is close at %s, days:%d, keep:%d,%d,%d, keepTimeOffset:%d", TD_VID(pdb->pVnode), pdb->path,
pdb->keepCfg.days, pdb->keepCfg.keep0, pdb->keepCfg.keep1, pdb->keepCfg.keep2, pdb->keepCfg.days, pdb->keepCfg.keep0, pdb->keepCfg.keep1, pdb->keepCfg.keep2,
pdb->keepCfg.keepTimeOffset); pdb->keepCfg.keepTimeOffset);
taosThreadMutexLock(&(*pTsdb)->mutex); (void)taosThreadMutexLock(&(*pTsdb)->mutex);
tsdbMemTableDestroy((*pTsdb)->mem, true); tsdbMemTableDestroy((*pTsdb)->mem, true);
(*pTsdb)->mem = NULL; (*pTsdb)->mem = NULL;
taosThreadMutexUnlock(&(*pTsdb)->mutex); (void)taosThreadMutexUnlock(&(*pTsdb)->mutex);
tsdbCloseFS(&(*pTsdb)->pFS); tsdbCloseFS(&(*pTsdb)->pFS);
tsdbCloseCache(*pTsdb); tsdbCloseCache(*pTsdb);

View File

@ -209,15 +209,15 @@ static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
if (TARRAY2_SIZE(&rtner->fopArr) > 0) { if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
TAOS_CHECK_GOTO(tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, TSDB_FEDIT_RETENTION), &lino, _exit); TAOS_CHECK_GOTO(tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, TSDB_FEDIT_RETENTION), &lino, _exit);
taosThreadMutexLock(&rtner->tsdb->mutex); (void)taosThreadMutexLock(&rtner->tsdb->mutex);
code = tsdbFSEditCommit(rtner->tsdb->pFS); code = tsdbFSEditCommit(rtner->tsdb->pFS);
if (code) { if (code) {
taosThreadMutexUnlock(&rtner->tsdb->mutex); (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
taosThreadMutexUnlock(&rtner->tsdb->mutex); (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
TARRAY2_DESTROY(&rtner->fopArr, NULL); TARRAY2_DESTROY(&rtner->fopArr, NULL);
} }
@ -315,13 +315,13 @@ static int32_t tsdbRetention(void *arg) {
}; };
// begin task // begin task
taosThreadMutexLock(&pTsdb->mutex); (void)taosThreadMutexLock(&pTsdb->mutex);
tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, &fset); tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, &fset);
if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) { if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) {
taosThreadMutexUnlock(&pTsdb->mutex); (void)taosThreadMutexUnlock(&pTsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
taosThreadMutexUnlock(&pTsdb->mutex); (void)taosThreadMutexUnlock(&pTsdb->mutex);
// do retention // do retention
if (rtner.fset) { if (rtner.fset) {
@ -336,9 +336,9 @@ static int32_t tsdbRetention(void *arg) {
_exit: _exit:
if (rtner.fset) { if (rtner.fset) {
taosThreadMutexLock(&pTsdb->mutex); (void)taosThreadMutexLock(&pTsdb->mutex);
tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid); tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid);
taosThreadMutexUnlock(&pTsdb->mutex); (void)taosThreadMutexUnlock(&pTsdb->mutex);
} }
// clear resources // clear resources
@ -387,9 +387,9 @@ _exit:
int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now) { int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now) {
int32_t code = 0; int32_t code = 0;
taosThreadMutexLock(&tsdb->mutex); (void)taosThreadMutexLock(&tsdb->mutex);
code = tsdbAsyncRetentionImpl(tsdb, now, false); code = tsdbAsyncRetentionImpl(tsdb, now, false);
taosThreadMutexUnlock(&tsdb->mutex); (void)taosThreadMutexUnlock(&tsdb->mutex);
return code; return code;
} }
@ -735,9 +735,9 @@ int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now) {
return 0; return 0;
} }
taosThreadMutexLock(&tsdb->mutex); (void)taosThreadMutexLock(&tsdb->mutex);
code = tsdbAsyncRetentionImpl(tsdb, now, true); code = tsdbAsyncRetentionImpl(tsdb, now, true);
taosThreadMutexUnlock(&tsdb->mutex); (void)taosThreadMutexUnlock(&tsdb->mutex);
if (code) { if (code) {
tsdbError("vgId:%d, %s failed, reason:%s", TD_VID(tsdb->pVnode), __func__, tstrerror(code)); tsdbError("vgId:%d, %s failed, reason:%s", TD_VID(tsdb->pVnode), __func__, tstrerror(code));

View File

@ -351,7 +351,7 @@ static STsdbFSetPartList* tsdbSnapGetFSetPartList(STFileSystem* fs) {
} }
int32_t code = 0; int32_t code = 0;
taosThreadMutexLock(&fs->tsdb->mutex); (void)taosThreadMutexLock(&fs->tsdb->mutex);
STFileSet* fset; STFileSet* fset;
TARRAY2_FOREACH(fs->fSetArr, fset) { TARRAY2_FOREACH(fs->fSetArr, fset) {
STsdbFSetPartition* pItem = NULL; STsdbFSetPartition* pItem = NULL;
@ -364,7 +364,7 @@ static STsdbFSetPartList* tsdbSnapGetFSetPartList(STFileSystem* fs) {
code = TARRAY2_SORT_INSERT(pList, pItem, tsdbFSetPartCmprFn); code = TARRAY2_SORT_INSERT(pList, pItem, tsdbFSetPartCmprFn);
ASSERT(code == 0); ASSERT(code == 0);
} }
taosThreadMutexUnlock(&fs->tsdb->mutex); (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
if (code) { if (code) {
TARRAY2_DESTROY(pList, tsdbFSetPartitionClear); TARRAY2_DESTROY(pList, tsdbFSetPartitionClear);

View File

@ -1125,17 +1125,17 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
code = tsdbFSEditAbort(writer[0]->tsdb->pFS); code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
taosThreadMutexLock(&writer[0]->tsdb->mutex); (void)taosThreadMutexLock(&writer[0]->tsdb->mutex);
code = tsdbFSEditCommit(writer[0]->tsdb->pFS); code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
if (code) { if (code) {
taosThreadMutexUnlock(&writer[0]->tsdb->mutex); (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL; writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
taosThreadMutexUnlock(&writer[0]->tsdb->mutex); (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
} }
tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger); tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger);

View File

@ -485,17 +485,17 @@ int32_t tsdbSnapRAWWriterClose(STsdbSnapRAWWriter** writer, int8_t rollback) {
code = tsdbFSEditAbort(writer[0]->tsdb->pFS); code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
taosThreadMutexLock(&writer[0]->tsdb->mutex); (void)taosThreadMutexLock(&writer[0]->tsdb->mutex);
code = tsdbFSEditCommit(writer[0]->tsdb->pFS); code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
if (code) { if (code) {
taosThreadMutexUnlock(&writer[0]->tsdb->mutex); (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL; writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
taosThreadMutexUnlock(&writer[0]->tsdb->mutex); (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
} }
TARRAY2_DESTROY(writer[0]->fopArr, NULL); TARRAY2_DESTROY(writer[0]->fopArr, NULL);

View File

@ -213,7 +213,7 @@ static void *vnodeAsyncLoop(void *arg) {
setThreadName(async->label); setThreadName(async->label);
for (;;) { for (;;) {
taosThreadMutexLock(&async->mutex); (void)taosThreadMutexLock(&async->mutex);
// finish last running task // finish last running task
if (worker->runningTask != NULL) { if (worker->runningTask != NULL) {
@ -228,7 +228,7 @@ static void *vnodeAsyncLoop(void *arg) {
} }
worker->state = EVA_WORKER_STATE_STOP; worker->state = EVA_WORKER_STATE_STOP;
async->numLaunchWorkers--; async->numLaunchWorkers--;
taosThreadMutexUnlock(&async->mutex); (void)taosThreadMutexUnlock(&async->mutex);
goto _exit; goto _exit;
} }
@ -268,7 +268,7 @@ static void *vnodeAsyncLoop(void *arg) {
} }
} }
taosThreadMutexUnlock(&async->mutex); (void)taosThreadMutexUnlock(&async->mutex);
// do run the task // do run the task
worker->runningTask->execute(worker->runningTask->arg); worker->runningTask->execute(worker->runningTask->arg);
@ -387,16 +387,16 @@ static int32_t vnodeAsyncDestroy(SVAsync **async) {
} }
// set stop and broadcast // set stop and broadcast
taosThreadMutexLock(&(*async)->mutex); (void)taosThreadMutexLock(&(*async)->mutex);
(*async)->stop = true; (*async)->stop = true;
taosThreadCondBroadcast(&(*async)->hasTask); taosThreadCondBroadcast(&(*async)->hasTask);
taosThreadMutexUnlock(&(*async)->mutex); (void)taosThreadMutexUnlock(&(*async)->mutex);
// join all workers // join all workers
for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) { for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) {
taosThreadMutexLock(&(*async)->mutex); (void)taosThreadMutexLock(&(*async)->mutex);
EVWorkerState state = (*async)->workers[i].state; EVWorkerState state = (*async)->workers[i].state;
taosThreadMutexUnlock(&(*async)->mutex); (void)taosThreadMutexUnlock(&(*async)->mutex);
if (state == EVA_WORKER_STATE_UINIT) { if (state == EVA_WORKER_STATE_UINIT) {
continue; continue;
@ -504,7 +504,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
taosThreadCondInit(&task->waitCond, NULL); taosThreadCondInit(&task->waitCond, NULL);
// schedule task // schedule task
taosThreadMutexLock(&async->mutex); (void)taosThreadMutexLock(&async->mutex);
if (channelID->id == 0) { if (channelID->id == 0) {
task->channel = NULL; task->channel = NULL;
@ -514,7 +514,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
}; };
vHashGet(async->channelTable, &channel, (void **)&task->channel); vHashGet(async->channelTable, &channel, (void **)&task->channel);
if (task->channel == NULL) { if (task->channel == NULL) {
taosThreadMutexUnlock(&async->mutex); (void)taosThreadMutexUnlock(&async->mutex);
taosThreadCondDestroy(&task->waitCond); taosThreadCondDestroy(&task->waitCond);
taosMemoryFree(task); taosMemoryFree(task);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
@ -526,7 +526,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
// add task to hash table // add task to hash table
int32_t ret = vHashPut(async->taskTable, task); int32_t ret = vHashPut(async->taskTable, task);
if (ret != 0) { if (ret != 0) {
taosThreadMutexUnlock(&async->mutex); (void)taosThreadMutexUnlock(&async->mutex);
taosThreadCondDestroy(&task->waitCond); taosThreadCondDestroy(&task->waitCond);
taosMemoryFree(task); taosMemoryFree(task);
return ret; return ret;
@ -580,7 +580,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
task->prev->next = task; task->prev->next = task;
} }
taosThreadMutexUnlock(&async->mutex); (void)taosThreadMutexUnlock(&async->mutex);
if (taskID != NULL) { if (taskID != NULL) {
taskID->async = channelID->async; taskID->async = channelID->async;
@ -601,7 +601,7 @@ int32_t vnodeAWait(SVATaskID *taskID) {
.taskId = taskID->id, .taskId = taskID->id,
}; };
taosThreadMutexLock(&async->mutex); (void)taosThreadMutexLock(&async->mutex);
vHashGet(async->taskTable, &task2, (void **)&task); vHashGet(async->taskTable, &task2, (void **)&task);
if (task) { if (task) {
@ -615,7 +615,7 @@ int32_t vnodeAWait(SVATaskID *taskID) {
} }
} }
taosThreadMutexUnlock(&async->mutex); (void)taosThreadMutexUnlock(&async->mutex);
return 0; return 0;
} }
@ -634,7 +634,7 @@ int32_t vnodeACancel(SVATaskID *taskID) {
void (*cancel)(void *) = NULL; void (*cancel)(void *) = NULL;
void *arg = NULL; void *arg = NULL;
taosThreadMutexLock(&async->mutex); (void)taosThreadMutexLock(&async->mutex);
vHashGet(async->taskTable, &task2, (void **)&task); vHashGet(async->taskTable, &task2, (void **)&task);
if (task) { if (task) {
@ -649,7 +649,7 @@ int32_t vnodeACancel(SVATaskID *taskID) {
} }
} }
taosThreadMutexUnlock(&async->mutex); (void)taosThreadMutexUnlock(&async->mutex);
if (cancel) { if (cancel) {
cancel(arg); cancel(arg);
@ -663,12 +663,12 @@ int32_t vnodeAsyncSetWorkers(int64_t asyncID, int32_t numWorkers) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
SVAsync *async = vnodeAsyncs[asyncID]; SVAsync *async = vnodeAsyncs[asyncID];
taosThreadMutexLock(&async->mutex); (void)taosThreadMutexLock(&async->mutex);
async->numWorkers = numWorkers; async->numWorkers = numWorkers;
if (async->numIdleWorkers > 0) { if (async->numIdleWorkers > 0) {
taosThreadCondBroadcast(&async->hasTask); taosThreadCondBroadcast(&async->hasTask);
} }
taosThreadMutexUnlock(&async->mutex); (void)taosThreadMutexUnlock(&async->mutex);
return 0; return 0;
} }
@ -693,14 +693,14 @@ int32_t vnodeAChannelInit(int64_t asyncID, SVAChannelID *channelID) {
channel->scheduled = NULL; channel->scheduled = NULL;
// register channel // register channel
taosThreadMutexLock(&async->mutex); (void)taosThreadMutexLock(&async->mutex);
channel->channelId = channelID->id = ++async->nextChannelId; channel->channelId = channelID->id = ++async->nextChannelId;
// add to hash table // add to hash table
int32_t ret = vHashPut(async->channelTable, channel); int32_t ret = vHashPut(async->channelTable, channel);
if (ret != 0) { if (ret != 0) {
taosThreadMutexUnlock(&async->mutex); (void)taosThreadMutexUnlock(&async->mutex);
taosMemoryFree(channel); taosMemoryFree(channel);
return ret; return ret;
} }
@ -713,7 +713,7 @@ int32_t vnodeAChannelInit(int64_t asyncID, SVAChannelID *channelID) {
async->numChannels++; async->numChannels++;
taosThreadMutexUnlock(&async->mutex); (void)taosThreadMutexUnlock(&async->mutex);
channelID->async = asyncID; channelID->async = asyncID;
return 0; return 0;
@ -734,7 +734,7 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosThreadMutexLock(&async->mutex); (void)taosThreadMutexLock(&async->mutex);
vHashGet(async->channelTable, &channel2, (void **)&channel); vHashGet(async->channelTable, &channel2, (void **)&channel);
if (channel) { if (channel) {
@ -793,7 +793,7 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
} }
} }
taosThreadMutexUnlock(&async->mutex); (void)taosThreadMutexUnlock(&async->mutex);
for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) { for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i); SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
cancel->cancel(cancel->arg); cancel->cancel(cancel->arg);

View File

@ -242,7 +242,9 @@ void vnodeBufPoolUnRef(SVBufPool *pPool, bool proactive) {
SVnode *pVnode = pPool->pVnode; SVnode *pVnode = pPool->pVnode;
if (proactive) taosThreadMutexLock(&pVnode->mutex); if (proactive) {
(void)taosThreadMutexLock(&pVnode->mutex);
}
if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit; if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit;
@ -267,12 +269,14 @@ void vnodeBufPoolUnRef(SVBufPool *pPool, bool proactive) {
vnodeBufPoolAddToFreeList(pPool); vnodeBufPoolAddToFreeList(pPool);
_exit: _exit:
if (proactive) taosThreadMutexUnlock(&pVnode->mutex); if (proactive) {
(void)taosThreadMutexUnlock(&pVnode->mutex);
}
return; return;
} }
int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) { int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
taosThreadMutexLock(&pPool->mutex); (void)taosThreadMutexLock(&pPool->mutex);
pQNode->pNext = pPool->qList.pNext; pQNode->pNext = pPool->qList.pNext;
pQNode->ppNext = &pPool->qList.pNext; pQNode->ppNext = &pPool->qList.pNext;
@ -280,20 +284,24 @@ int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
pPool->qList.pNext = pQNode; pPool->qList.pNext = pQNode;
pPool->nQuery++; pPool->nQuery++;
taosThreadMutexUnlock(&pPool->mutex); (void)taosThreadMutexUnlock(&pPool->mutex);
return 0; return 0;
} }
void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) { void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) {
int32_t code = 0; int32_t code = 0;
if (proactive) taosThreadMutexLock(&pPool->mutex); if (proactive) {
(void)taosThreadMutexLock(&pPool->mutex);
}
pQNode->pNext->ppNext = pQNode->ppNext; pQNode->pNext->ppNext = pQNode->ppNext;
*pQNode->ppNext = pQNode->pNext; *pQNode->ppNext = pQNode->pNext;
pPool->nQuery--; pPool->nQuery--;
if (proactive) taosThreadMutexUnlock(&pPool->mutex); if (proactive) {
(void)taosThreadMutexUnlock(&pPool->mutex);
}
} }
int32_t vnodeBufPoolRecycle(SVBufPool *pPool) { int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
@ -303,7 +311,7 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
vDebug("vgId:%d, recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id); vDebug("vgId:%d, recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id);
taosThreadMutexLock(&pPool->mutex); (void)taosThreadMutexLock(&pPool->mutex);
SQueryNode *pNode = pPool->qList.pNext; SQueryNode *pNode = pPool->qList.pNext;
while (pNode != &pPool->qList) { while (pNode != &pPool->qList) {
@ -319,6 +327,6 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
} }
_exit: _exit:
taosThreadMutexUnlock(&pPool->mutex); (void)taosThreadMutexUnlock(&pPool->mutex);
return code; return code;
} }

View File

@ -65,7 +65,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
taosThreadMutexLock(&pVnode->mutex); (void)taosThreadMutexLock(&pVnode->mutex);
int32_t nTry = 0; int32_t nTry = 0;
for (;;) { for (;;) {
@ -110,7 +110,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
} }
_exit: _exit:
taosThreadMutexUnlock(&pVnode->mutex); (void)taosThreadMutexUnlock(&pVnode->mutex);
if (code) { if (code) {
vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} }
@ -150,7 +150,7 @@ int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
bool diskAvail = osDataSpaceAvailable(); bool diskAvail = osDataSpaceAvailable();
bool needCommit = false; bool needCommit = false;
taosThreadMutexLock(&pVnode->mutex); (void)taosThreadMutexLock(&pVnode->mutex);
if (pVnode->inUse && diskAvail) { if (pVnode->inUse && diskAvail) {
needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) || needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) ||
(atExit && (pVnode->inUse->size > 0 || pVnode->pMeta->changed || (atExit && (pVnode->inUse->size > 0 || pVnode->pMeta->changed ||
@ -162,7 +162,7 @@ int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
TD_VID(pVnode), needCommit, diskAvail, pVnode->inUse ? pVnode->inUse->size : 0, TD_VID(pVnode), needCommit, diskAvail, pVnode->inUse ? pVnode->inUse->size : 0,
pVnode->inUse ? pVnode->inUse->node.size : 0, pVnode->pMeta->changed, pVnode->state.applied, pVnode->inUse ? pVnode->inUse->node.size : 0, pVnode->pMeta->changed, pVnode->state.applied,
pVnode->state.committed); pVnode->state.committed);
taosThreadMutexUnlock(&pVnode->mutex); (void)taosThreadMutexUnlock(&pVnode->mutex);
return needCommit; return needCommit;
} }
@ -299,11 +299,11 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
code = smaPrepareAsyncCommit(pVnode->pSma); code = smaPrepareAsyncCommit(pVnode->pSma);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
taosThreadMutexLock(&pVnode->mutex); (void)taosThreadMutexLock(&pVnode->mutex);
ASSERT(pVnode->onCommit == NULL); ASSERT(pVnode->onCommit == NULL);
pVnode->onCommit = pVnode->inUse; pVnode->onCommit = pVnode->inUse;
pVnode->inUse = NULL; pVnode->inUse = NULL;
taosThreadMutexUnlock(&pVnode->mutex); (void)taosThreadMutexUnlock(&pVnode->mutex);
_exit: _exit:
if (code) { if (code) {
@ -316,7 +316,7 @@ _exit:
return code; return code;
} }
static void vnodeReturnBufPool(SVnode *pVnode) { static void vnodeReturnBufPool(SVnode *pVnode) {
taosThreadMutexLock(&pVnode->mutex); (void)taosThreadMutexLock(&pVnode->mutex);
SVBufPool *pPool = pVnode->onCommit; SVBufPool *pPool = pVnode->onCommit;
int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1); int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
@ -340,7 +340,7 @@ static void vnodeReturnBufPool(SVnode *pVnode) {
ASSERT(0); ASSERT(0);
} }
taosThreadMutexUnlock(&pVnode->mutex); (void)taosThreadMutexUnlock(&pVnode->mutex);
} }
static int32_t vnodeCommit(void *arg) { static int32_t vnodeCommit(void *arg) {
int32_t code = 0; int32_t code = 0;

View File

@ -34,7 +34,7 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) { if (vnodeIsMsgBlock(pMsg->msgType)) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
taosThreadMutexLock(&pVnode->lock); (void)taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) { if (pVnode->blocked) {
vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg, vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq); TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
@ -43,7 +43,7 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
pVnode->blockSeq = 0; pVnode->blockSeq = 0;
tsem_post(&pVnode->syncSem); tsem_post(&pVnode->syncSem);
} }
taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
} }
} }
@ -113,7 +113,7 @@ static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code)
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) { static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
int64_t seq = 0; int64_t seq = 0;
taosThreadMutexLock(&pVnode->lock); (void)taosThreadMutexLock(&pVnode->lock);
int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq); int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType)); bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
if (wait) { if (wait) {
@ -122,7 +122,7 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak
pVnode->blockSec = taosGetTimestampSec(); pVnode->blockSec = taosGetTimestampSec();
pVnode->blockSeq = seq; pVnode->blockSeq = seq;
} }
taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
if (code > 0) { if (code > 0) {
vnodeHandleWriteMsg(pVnode, pMsg); vnodeHandleWriteMsg(pVnode, pMsg);
@ -171,14 +171,14 @@ static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool
if (*arrSize <= 0) return; if (*arrSize <= 0) return;
SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1]; SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
taosThreadMutexLock(&pVnode->lock); (void)taosThreadMutexLock(&pVnode->lock);
int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize); int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
bool wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType)); bool wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
if (wait) { if (wait) {
ASSERT(!pVnode->blocked); ASSERT(!pVnode->blocked);
pVnode->blocked = true; pVnode->blocked = true;
} }
taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
if (code > 0) { if (code > 0) {
for (int32_t i = 0; i < *arrSize; ++i) { for (int32_t i = 0; i < *arrSize; ++i) {
@ -598,13 +598,13 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, become follower", pVnode->config.vgId); vInfo("vgId:%d, become follower", pVnode->config.vgId);
taosThreadMutexLock(&pVnode->lock); (void)taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) { if (pVnode->blocked) {
pVnode->blocked = false; pVnode->blocked = false;
vDebug("vgId:%d, become follower and post block", pVnode->config.vgId); vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
tsem_post(&pVnode->syncSem); tsem_post(&pVnode->syncSem);
} }
taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
if (pVnode->pTq) { if (pVnode->pTq) {
tqUpdateNodeStage(pVnode->pTq, false); tqUpdateNodeStage(pVnode->pTq, false);
@ -616,13 +616,13 @@ static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, become learner", pVnode->config.vgId); vInfo("vgId:%d, become learner", pVnode->config.vgId);
taosThreadMutexLock(&pVnode->lock); (void)taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) { if (pVnode->blocked) {
pVnode->blocked = false; pVnode->blocked = false;
vDebug("vgId:%d, become learner and post block", pVnode->config.vgId); vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
tsem_post(&pVnode->syncSem); tsem_post(&pVnode->syncSem);
} }
taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
} }
static void vnodeBecomeLeader(const SSyncFSM *pFsm) { static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
@ -746,13 +746,13 @@ void vnodeSyncPreClose(SVnode *pVnode) {
syncLeaderTransfer(pVnode->sync); syncLeaderTransfer(pVnode->sync);
syncPreStop(pVnode->sync); syncPreStop(pVnode->sync);
taosThreadMutexLock(&pVnode->lock); (void)taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) { if (pVnode->blocked) {
vInfo("vgId:%d, post block after close sync", pVnode->config.vgId); vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
pVnode->blocked = false; pVnode->blocked = false;
tsem_post(&pVnode->syncSem); tsem_post(&pVnode->syncSem);
} }
taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
} }
void vnodeSyncPostClose(SVnode *pVnode) { void vnodeSyncPostClose(SVnode *pVnode) {
@ -767,7 +767,7 @@ void vnodeSyncClose(SVnode *pVnode) {
void vnodeSyncCheckTimeout(SVnode *pVnode) { void vnodeSyncCheckTimeout(SVnode *pVnode) {
vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId); vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
taosThreadMutexLock(&pVnode->lock); (void)taosThreadMutexLock(&pVnode->lock);
if (pVnode->blocked) { if (pVnode->blocked) {
int32_t curSec = taosGetTimestampSec(); int32_t curSec = taosGetTimestampSec();
int32_t delta = curSec - pVnode->blockSec; int32_t delta = curSec - pVnode->blockSec;
@ -788,7 +788,7 @@ void vnodeSyncCheckTimeout(SVnode *pVnode) {
tsem_post(&pVnode->syncSem); tsem_post(&pVnode->syncSem);
} }
} }
taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
} }
bool vnodeIsRoleLeader(SVnode *pVnode) { bool vnodeIsRoleLeader(SVnode *pVnode) {

View File

@ -28,7 +28,7 @@ char *tsMonSlowLogUri = "/slow-sql-detail-batch";
char *tsMonFwBasicUri = "/taosd-cluster-basic"; char *tsMonFwBasicUri = "/taosd-cluster-basic";
void monRecordLog(int64_t ts, ELogLevel level, const char *content) { void monRecordLog(int64_t ts, ELogLevel level, const char *content) {
taosThreadMutexLock(&tsMonitor.lock); (void)taosThreadMutexLock(&tsMonitor.lock);
int32_t size = taosArrayGetSize(tsMonitor.logs); int32_t size = taosArrayGetSize(tsMonitor.logs);
if (size < tsMonitor.cfg.maxLogs) { if (size < tsMonitor.cfg.maxLogs) {
SMonLogItem item = {.ts = ts, .level = level}; SMonLogItem item = {.ts = ts, .level = level};
@ -37,11 +37,11 @@ void monRecordLog(int64_t ts, ELogLevel level, const char *content) {
tstrncpy(pItem->content, content, MON_LOG_LEN); tstrncpy(pItem->content, content, MON_LOG_LEN);
} }
} }
taosThreadMutexUnlock(&tsMonitor.lock); (void)taosThreadMutexUnlock(&tsMonitor.lock);
} }
int32_t monGetLogs(SMonLogs *logs) { int32_t monGetLogs(SMonLogs *logs) {
taosThreadMutexLock(&tsMonitor.lock); (void)taosThreadMutexLock(&tsMonitor.lock);
logs->logs = taosArrayDup(tsMonitor.logs, NULL); logs->logs = taosArrayDup(tsMonitor.logs, NULL);
logs->numOfInfoLogs = tsNumOfInfoLogs; logs->numOfInfoLogs = tsNumOfInfoLogs;
logs->numOfErrorLogs = tsNumOfErrorLogs; logs->numOfErrorLogs = tsNumOfErrorLogs;
@ -52,7 +52,7 @@ int32_t monGetLogs(SMonLogs *logs) {
tsNumOfDebugLogs = 0; tsNumOfDebugLogs = 0;
tsNumOfTraceLogs = 0; tsNumOfTraceLogs = 0;
taosArrayClear(tsMonitor.logs); taosArrayClear(tsMonitor.logs);
taosThreadMutexUnlock(&tsMonitor.lock); (void)taosThreadMutexUnlock(&tsMonitor.lock);
if (logs->logs == NULL) { if (logs->logs == NULL) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
} }
@ -60,44 +60,44 @@ int32_t monGetLogs(SMonLogs *logs) {
} }
void monSetDmInfo(SMonDmInfo *pInfo) { void monSetDmInfo(SMonDmInfo *pInfo) {
taosThreadMutexLock(&tsMonitor.lock); (void)taosThreadMutexLock(&tsMonitor.lock);
memcpy(&tsMonitor.dmInfo, pInfo, sizeof(SMonDmInfo)); memcpy(&tsMonitor.dmInfo, pInfo, sizeof(SMonDmInfo));
taosThreadMutexUnlock(&tsMonitor.lock); (void)taosThreadMutexUnlock(&tsMonitor.lock);
memset(pInfo, 0, sizeof(SMonDmInfo)); memset(pInfo, 0, sizeof(SMonDmInfo));
} }
void monSetMmInfo(SMonMmInfo *pInfo) { void monSetMmInfo(SMonMmInfo *pInfo) {
taosThreadMutexLock(&tsMonitor.lock); (void)taosThreadMutexLock(&tsMonitor.lock);
memcpy(&tsMonitor.mmInfo, pInfo, sizeof(SMonMmInfo)); memcpy(&tsMonitor.mmInfo, pInfo, sizeof(SMonMmInfo));
taosThreadMutexUnlock(&tsMonitor.lock); (void)taosThreadMutexUnlock(&tsMonitor.lock);
memset(pInfo, 0, sizeof(SMonMmInfo)); memset(pInfo, 0, sizeof(SMonMmInfo));
} }
void monSetVmInfo(SMonVmInfo *pInfo) { void monSetVmInfo(SMonVmInfo *pInfo) {
taosThreadMutexLock(&tsMonitor.lock); (void)taosThreadMutexLock(&tsMonitor.lock);
memcpy(&tsMonitor.vmInfo, pInfo, sizeof(SMonVmInfo)); memcpy(&tsMonitor.vmInfo, pInfo, sizeof(SMonVmInfo));
taosThreadMutexUnlock(&tsMonitor.lock); (void)taosThreadMutexUnlock(&tsMonitor.lock);
memset(pInfo, 0, sizeof(SMonVmInfo)); memset(pInfo, 0, sizeof(SMonVmInfo));
} }
void monSetQmInfo(SMonQmInfo *pInfo) { void monSetQmInfo(SMonQmInfo *pInfo) {
taosThreadMutexLock(&tsMonitor.lock); (void)taosThreadMutexLock(&tsMonitor.lock);
memcpy(&tsMonitor.qmInfo, pInfo, sizeof(SMonQmInfo)); memcpy(&tsMonitor.qmInfo, pInfo, sizeof(SMonQmInfo));
taosThreadMutexUnlock(&tsMonitor.lock); (void)taosThreadMutexUnlock(&tsMonitor.lock);
memset(pInfo, 0, sizeof(SMonQmInfo)); memset(pInfo, 0, sizeof(SMonQmInfo));
} }
void monSetSmInfo(SMonSmInfo *pInfo) { void monSetSmInfo(SMonSmInfo *pInfo) {
taosThreadMutexLock(&tsMonitor.lock); (void)taosThreadMutexLock(&tsMonitor.lock);
memcpy(&tsMonitor.smInfo, pInfo, sizeof(SMonSmInfo)); memcpy(&tsMonitor.smInfo, pInfo, sizeof(SMonSmInfo));
taosThreadMutexUnlock(&tsMonitor.lock); (void)taosThreadMutexUnlock(&tsMonitor.lock);
memset(pInfo, 0, sizeof(SMonSmInfo)); memset(pInfo, 0, sizeof(SMonSmInfo));
} }
void monSetBmInfo(SMonBmInfo *pInfo) { void monSetBmInfo(SMonBmInfo *pInfo) {
taosThreadMutexLock(&tsMonitor.lock); (void)taosThreadMutexLock(&tsMonitor.lock);
memcpy(&tsMonitor.bmInfo, pInfo, sizeof(SMonBmInfo)); memcpy(&tsMonitor.bmInfo, pInfo, sizeof(SMonBmInfo));
taosThreadMutexUnlock(&tsMonitor.lock); (void)taosThreadMutexUnlock(&tsMonitor.lock);
memset(pInfo, 0, sizeof(SMonBmInfo)); memset(pInfo, 0, sizeof(SMonBmInfo));
} }
@ -153,7 +153,7 @@ static SMonInfo *monCreateMonitorInfo() {
monGetLogs(&pMonitor->log); monGetLogs(&pMonitor->log);
taosThreadMutexLock(&tsMonitor.lock); (void)taosThreadMutexLock(&tsMonitor.lock);
memcpy(&pMonitor->dmInfo, &tsMonitor.dmInfo, sizeof(SMonDmInfo)); memcpy(&pMonitor->dmInfo, &tsMonitor.dmInfo, sizeof(SMonDmInfo));
memcpy(&pMonitor->mmInfo, &tsMonitor.mmInfo, sizeof(SMonMmInfo)); memcpy(&pMonitor->mmInfo, &tsMonitor.mmInfo, sizeof(SMonMmInfo));
memcpy(&pMonitor->vmInfo, &tsMonitor.vmInfo, sizeof(SMonVmInfo)); memcpy(&pMonitor->vmInfo, &tsMonitor.vmInfo, sizeof(SMonVmInfo));
@ -166,7 +166,7 @@ static SMonInfo *monCreateMonitorInfo() {
memset(&tsMonitor.smInfo, 0, sizeof(SMonSmInfo)); memset(&tsMonitor.smInfo, 0, sizeof(SMonSmInfo));
memset(&tsMonitor.qmInfo, 0, sizeof(SMonQmInfo)); memset(&tsMonitor.qmInfo, 0, sizeof(SMonQmInfo));
memset(&tsMonitor.bmInfo, 0, sizeof(SMonBmInfo)); memset(&tsMonitor.bmInfo, 0, sizeof(SMonBmInfo));
taosThreadMutexUnlock(&tsMonitor.lock); (void)taosThreadMutexUnlock(&tsMonitor.lock);
pMonitor->pJson = tjsonCreateObject(); pMonitor->pJson = tjsonCreateObject();
if (pMonitor->pJson == NULL || pMonitor->log.logs == NULL) { if (pMonitor->pJson == NULL || pMonitor->log.logs == NULL) {

View File

@ -76,9 +76,9 @@ static void httpHandleReq(SHttpMsg* msg);
static void httpHandleQuit(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg);
static int32_t httpSendQuit(SHttpModule* http, int64_t chanId); static int32_t httpSendQuit(SHttpModule* http, int64_t chanId);
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg); EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg);
static void httpDestroyMsg(SHttpMsg* msg); static void httpDestroyMsg(SHttpMsg* msg);
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port); static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port);
static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ); static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ);
@ -91,27 +91,27 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri,
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen, static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen,
EHttpCompFlag flag) { EHttpCompFlag flag) {
int32_t code = 0; int32_t code = 0;
int32_t len = 0; int32_t len = 0;
if (flag == HTTP_FLAT) { if (flag == HTTP_FLAT) {
len = snprintf(pHead, headLen, len = snprintf(pHead, headLen,
"POST %s HTTP/1.1\n" "POST %s HTTP/1.1\n"
"Host: %s\n" "Host: %s\n"
"Content-Type: application/json\n" "Content-Type: application/json\n"
"Content-Length: %d\n\n", "Content-Length: %d\n\n",
uri, server, contLen); uri, server, contLen);
if (len < 0 || len >= headLen) { if (len < 0 || len >= headLen) {
code = TSDB_CODE_OUT_OF_RANGE; code = TSDB_CODE_OUT_OF_RANGE;
} }
} else if (flag == HTTP_GZIP) { } else if (flag == HTTP_GZIP) {
len = snprintf(pHead, headLen, len = snprintf(pHead, headLen,
"POST %s HTTP/1.1\n" "POST %s HTTP/1.1\n"
"Host: %s\n" "Host: %s\n"
"Content-Type: application/json\n" "Content-Type: application/json\n"
"Content-Encoding: gzip\n" "Content-Encoding: gzip\n"
"Content-Length: %d\n\n", "Content-Length: %d\n\n",
uri, server, contLen); uri, server, contLen);
if (len < 0 || len >= headLen) { if (len < 0 || len >= headLen) {
code = TSDB_CODE_OUT_OF_RANGE; code = TSDB_CODE_OUT_OF_RANGE;
} }
@ -127,7 +127,7 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
void* pDest = taosMemoryMalloc(destLen); void* pDest = taosMemoryMalloc(destLen);
if (pDest == NULL) { if (pDest == NULL) {
code= TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER; goto _OVER;
} }
@ -184,14 +184,14 @@ _OVER:
if (code == 0) { if (code == 0) {
memcpy(pSrc, pDest, gzipStream.total_out); memcpy(pSrc, pDest, gzipStream.total_out);
code = gzipStream.total_out; code = gzipStream.total_out;
} }
taosMemoryFree(pDest); taosMemoryFree(pDest);
return code; return code;
} }
static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
uint32_t ip = 0; uint32_t ip = 0;
int32_t code = taosGetIpv4FromFqdn(server, &ip); int32_t code = taosGetIpv4FromFqdn(server, &ip);
if (code) { if (code) {
tError("http-report failed to resolving domain names: %s", server); tError("http-report failed to resolving domain names: %s", server);
return TSDB_CODE_RPC_FQDN_ERROR; return TSDB_CODE_RPC_FQDN_ERROR;
@ -293,7 +293,7 @@ static void httpAsyncCb(uv_async_t* handle) {
static int32_t BATCH_SIZE = 5; static int32_t BATCH_SIZE = 5;
int32_t count = 0; int32_t count = 0;
taosThreadMutexLock(&item->mtx); (void)taosThreadMutexLock(&item->mtx);
httpMayDiscardMsg(http, item); httpMayDiscardMsg(http, item);
while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BATCH_SIZE) { while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BATCH_SIZE) {
@ -301,7 +301,7 @@ static void httpAsyncCb(uv_async_t* handle) {
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
QUEUE_PUSH(&wq, h); QUEUE_PUSH(&wq, h);
} }
taosThreadMutexUnlock(&item->mtx); (void)taosThreadMutexUnlock(&item->mtx);
while (!QUEUE_IS_EMPTY(&wq)) { while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq); queue* h = QUEUE_HEAD(&wq);
@ -636,8 +636,8 @@ void httpModuleDestroy2(SHttpModule* http) {
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId) { int32_t contLen, EHttpCompFlag flag, int64_t chanId) {
SHttpModule* load = NULL; SHttpModule* load = NULL;
SHttpMsg *msg = NULL; SHttpMsg* msg = NULL;
int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId,&msg); int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, &msg);
if (code != 0) { if (code != 0) {
goto _ERROR; goto _ERROR;
} }

View File

@ -112,7 +112,7 @@ static FORCE_INLINE void __trashcan_wr_lock(SCacheObj *pCacheObj) {
#if defined(LINUX) #if defined(LINUX)
taosThreadRwlockWrlock(&pCacheObj->lock); taosThreadRwlockWrlock(&pCacheObj->lock);
#else #else
taosThreadMutexLock(&pCacheObj->lock); (void)taosThreadMutexLock(&pCacheObj->lock);
#endif #endif
} }
@ -120,7 +120,7 @@ static FORCE_INLINE void __trashcan_unlock(SCacheObj *pCacheObj) {
#if defined(LINUX) #if defined(LINUX)
taosThreadRwlockUnlock(&pCacheObj->lock); taosThreadRwlockUnlock(&pCacheObj->lock);
#else #else
taosThreadMutexUnlock(&pCacheObj->lock); (void)taosThreadMutexUnlock(&pCacheObj->lock);
#endif #endif
} }
@ -168,9 +168,9 @@ static void doInitRefreshThread(void) {
TdThread doRegisterCacheObj(SCacheObj *pCacheObj) { TdThread doRegisterCacheObj(SCacheObj *pCacheObj) {
taosThreadOnce(&cacheThreadInit, doInitRefreshThread); taosThreadOnce(&cacheThreadInit, doInitRefreshThread);
taosThreadMutexLock(&guard); (void)taosThreadMutexLock(&guard);
taosArrayPush(pCacheArrayList, &pCacheObj); (void)taosArrayPush(pCacheArrayList, &pCacheObj);
taosThreadMutexUnlock(&guard); (void)taosThreadMutexUnlock(&guard);
return cacheRefreshWorker; return cacheRefreshWorker;
} }
@ -840,19 +840,19 @@ void *taosCacheTimedRefresh(void *handle) {
goto _end; goto _end;
} }
taosThreadMutexLock(&guard); (void)taosThreadMutexLock(&guard);
size_t size = taosArrayGetSize(pCacheArrayList); size_t size = taosArrayGetSize(pCacheArrayList);
taosThreadMutexUnlock(&guard); (void)taosThreadMutexUnlock(&guard);
count += 1; count += 1;
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
taosThreadMutexLock(&guard); (void)taosThreadMutexLock(&guard);
SCacheObj *pCacheObj = taosArrayGetP(pCacheArrayList, i); SCacheObj *pCacheObj = taosArrayGetP(pCacheArrayList, i);
if (pCacheObj == NULL) { if (pCacheObj == NULL) {
uError("object is destroyed. ignore and try next"); uError("object is destroyed. ignore and try next");
taosThreadMutexUnlock(&guard); (void)taosThreadMutexUnlock(&guard);
continue; continue;
} }
@ -864,11 +864,11 @@ void *taosCacheTimedRefresh(void *handle) {
uDebug("%s is destroying, remove it from refresh list, remain cache obj:%" PRIzu, pCacheObj->name, size); uDebug("%s is destroying, remove it from refresh list, remain cache obj:%" PRIzu, pCacheObj->name, size);
pCacheObj->deleting = 0; // reset the deleting flag to enable pCacheObj to continue releasing resources. pCacheObj->deleting = 0; // reset the deleting flag to enable pCacheObj to continue releasing resources.
taosThreadMutexUnlock(&guard); (void)taosThreadMutexUnlock(&guard);
continue; continue;
} }
taosThreadMutexUnlock(&guard); (void)taosThreadMutexUnlock(&guard);
if ((count % pCacheObj->checkTick) != 0) { if ((count % pCacheObj->checkTick) != 0) {
continue; continue;

View File

@ -44,7 +44,7 @@ int32_t taosAllocateId(id_pool_t *pIdPool) {
if (pIdPool == NULL) return -1; if (pIdPool == NULL) return -1;
int32_t slot = -1; int32_t slot = -1;
taosThreadMutexLock(&pIdPool->mutex); (void)taosThreadMutexLock(&pIdPool->mutex);
if (pIdPool->numOfFree > 0) { if (pIdPool->numOfFree > 0) {
for (int32_t i = 0; i < pIdPool->maxId; ++i) { for (int32_t i = 0; i < pIdPool->maxId; ++i) {
@ -58,14 +58,14 @@ int32_t taosAllocateId(id_pool_t *pIdPool) {
} }
} }
taosThreadMutexUnlock(&pIdPool->mutex); (void)taosThreadMutexUnlock(&pIdPool->mutex);
return slot + 1; return slot + 1;
} }
void taosFreeId(id_pool_t *pIdPool, int32_t id) { void taosFreeId(id_pool_t *pIdPool, int32_t id) {
if (pIdPool == NULL) return; if (pIdPool == NULL) return;
taosThreadMutexLock(&pIdPool->mutex); (void)taosThreadMutexLock(&pIdPool->mutex);
int32_t slot = (id - 1) % pIdPool->maxId; int32_t slot = (id - 1) % pIdPool->maxId;
if (pIdPool->freeList[slot]) { if (pIdPool->freeList[slot]) {
@ -73,7 +73,7 @@ void taosFreeId(id_pool_t *pIdPool, int32_t id) {
pIdPool->numOfFree++; pIdPool->numOfFree++;
} }
taosThreadMutexUnlock(&pIdPool->mutex); (void)taosThreadMutexUnlock(&pIdPool->mutex);
} }
void taosIdPoolCleanUp(id_pool_t *pIdPool) { void taosIdPoolCleanUp(id_pool_t *pIdPool) {
@ -91,16 +91,16 @@ void taosIdPoolCleanUp(id_pool_t *pIdPool) {
} }
int32_t taosIdPoolNumOfUsed(id_pool_t *pIdPool) { int32_t taosIdPoolNumOfUsed(id_pool_t *pIdPool) {
taosThreadMutexLock(&pIdPool->mutex); (void)taosThreadMutexLock(&pIdPool->mutex);
int32_t ret = pIdPool->maxId - pIdPool->numOfFree; int32_t ret = pIdPool->maxId - pIdPool->numOfFree;
taosThreadMutexUnlock(&pIdPool->mutex); (void)taosThreadMutexUnlock(&pIdPool->mutex);
return ret; return ret;
} }
bool taosIdPoolMarkStatus(id_pool_t *pIdPool, int32_t id) { bool taosIdPoolMarkStatus(id_pool_t *pIdPool, int32_t id) {
bool ret = false; bool ret = false;
taosThreadMutexLock(&pIdPool->mutex); (void)taosThreadMutexLock(&pIdPool->mutex);
int32_t slot = (id - 1) % pIdPool->maxId; int32_t slot = (id - 1) % pIdPool->maxId;
if (!pIdPool->freeList[slot]) { if (!pIdPool->freeList[slot]) {
@ -111,7 +111,7 @@ bool taosIdPoolMarkStatus(id_pool_t *pIdPool, int32_t id) {
ret = false; ret = false;
} }
taosThreadMutexUnlock(&pIdPool->mutex); (void)taosThreadMutexUnlock(&pIdPool->mutex);
return ret; return ret;
} }
@ -125,7 +125,7 @@ int32_t taosUpdateIdPool(id_pool_t *pIdPool, int32_t maxId) {
return terrno; return terrno;
} }
taosThreadMutexLock(&pIdPool->mutex); (void)taosThreadMutexLock(&pIdPool->mutex);
memcpy(idList, pIdPool->freeList, sizeof(bool) * pIdPool->maxId); memcpy(idList, pIdPool->freeList, sizeof(bool) * pIdPool->maxId);
pIdPool->numOfFree += (maxId - pIdPool->maxId); pIdPool->numOfFree += (maxId - pIdPool->maxId);
@ -135,15 +135,15 @@ int32_t taosUpdateIdPool(id_pool_t *pIdPool, int32_t maxId) {
pIdPool->freeList = idList; pIdPool->freeList = idList;
taosMemoryFree(oldIdList); taosMemoryFree(oldIdList);
taosThreadMutexUnlock(&pIdPool->mutex); (void)taosThreadMutexUnlock(&pIdPool->mutex);
return 0; return 0;
} }
int32_t taosIdPoolMaxSize(id_pool_t *pIdPool) { int32_t taosIdPoolMaxSize(id_pool_t *pIdPool) {
taosThreadMutexLock(&pIdPool->mutex); (void)taosThreadMutexLock(&pIdPool->mutex);
int32_t ret = pIdPool->maxId; int32_t ret = pIdPool->maxId;
taosThreadMutexUnlock(&pIdPool->mutex); (void)taosThreadMutexUnlock(&pIdPool->mutex);
return ret; return ret;
} }

View File

@ -362,7 +362,7 @@ static void *taosThreadToCloseOldFile(void *param) {
} }
static int32_t taosOpenNewLogFile() { static int32_t taosOpenNewLogFile() {
taosThreadMutexLock(&tsLogObj.logMutex); (void)taosThreadMutexLock(&tsLogObj.logMutex);
if (tsLogObj.lines > tsNumOfLogLines && tsLogObj.openInProgress == 0) { if (tsLogObj.lines > tsNumOfLogLines && tsLogObj.openInProgress == 0) {
tsLogObj.openInProgress = 1; tsLogObj.openInProgress = 1;
@ -378,7 +378,7 @@ static int32_t taosOpenNewLogFile() {
taosThreadAttrDestroy(&attr); taosThreadAttrDestroy(&attr);
} }
taosThreadMutexUnlock(&tsLogObj.logMutex); (void)taosThreadMutexUnlock(&tsLogObj.logMutex);
return 0; return 0;
} }
@ -719,7 +719,7 @@ static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msg
if (pLogBuf == NULL || pLogBuf->stop) return -1; if (pLogBuf == NULL || pLogBuf->stop) return -1;
taosThreadMutexLock(&LOG_BUF_MUTEX(pLogBuf)); (void)taosThreadMutexLock(&LOG_BUF_MUTEX(pLogBuf));
start = LOG_BUF_START(pLogBuf); start = LOG_BUF_START(pLogBuf);
end = LOG_BUF_END(pLogBuf); end = LOG_BUF_END(pLogBuf);
@ -733,7 +733,7 @@ static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msg
if (remainSize <= msgLen || ((lostLine > 0) && (remainSize <= (msgLen + tmpBufLen)))) { if (remainSize <= msgLen || ((lostLine > 0) && (remainSize <= (msgLen + tmpBufLen)))) {
lostLine++; lostLine++;
tsAsyncLogLostLines++; tsAsyncLogLostLines++;
taosThreadMutexUnlock(&LOG_BUF_MUTEX(pLogBuf)); (void)taosThreadMutexUnlock(&LOG_BUF_MUTEX(pLogBuf));
return -1; return -1;
} }
@ -754,7 +754,7 @@ static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msg
} }
*/ */
taosThreadMutexUnlock(&LOG_BUF_MUTEX(pLogBuf)); (void)taosThreadMutexUnlock(&LOG_BUF_MUTEX(pLogBuf));
return 0; return 0;
} }

View File

@ -86,11 +86,11 @@ void taosCloseQueue(STaosQueue *queue) {
STaosQnode *pTemp; STaosQnode *pTemp;
STaosQset *qset; STaosQset *qset;
taosThreadMutexLock(&queue->mutex); (void)taosThreadMutexLock(&queue->mutex);
STaosQnode *pNode = queue->head; STaosQnode *pNode = queue->head;
queue->head = NULL; queue->head = NULL;
qset = queue->qset; qset = queue->qset;
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
if (queue->qset) { if (queue->qset) {
taosRemoveFromQset(qset, queue); taosRemoveFromQset(qset, queue);
@ -112,11 +112,11 @@ bool taosQueueEmpty(STaosQueue *queue) {
if (queue == NULL) return true; if (queue == NULL) return true;
bool empty = false; bool empty = false;
taosThreadMutexLock(&queue->mutex); (void)taosThreadMutexLock(&queue->mutex);
if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) { if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) {
empty = true; empty = true;
} }
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
return empty; return empty;
} }
@ -124,26 +124,26 @@ bool taosQueueEmpty(STaosQueue *queue) {
void taosUpdateItemSize(STaosQueue *queue, int32_t items) { void taosUpdateItemSize(STaosQueue *queue, int32_t items) {
if (queue == NULL) return; if (queue == NULL) return;
taosThreadMutexLock(&queue->mutex); (void)taosThreadMutexLock(&queue->mutex);
queue->numOfItems -= items; queue->numOfItems -= items;
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
} }
int32_t taosQueueItemSize(STaosQueue *queue) { int32_t taosQueueItemSize(STaosQueue *queue) {
if (queue == NULL) return 0; if (queue == NULL) return 0;
taosThreadMutexLock(&queue->mutex); (void)taosThreadMutexLock(&queue->mutex);
int32_t numOfItems = queue->numOfItems; int32_t numOfItems = queue->numOfItems;
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
uTrace("queue:%p, numOfItems:%d memOfItems:%" PRId64, queue, queue->numOfItems, queue->memOfItems); uTrace("queue:%p, numOfItems:%d memOfItems:%" PRId64, queue, queue->numOfItems, queue->memOfItems);
return numOfItems; return numOfItems;
} }
int64_t taosQueueMemorySize(STaosQueue *queue) { int64_t taosQueueMemorySize(STaosQueue *queue) {
taosThreadMutexLock(&queue->mutex); (void)taosThreadMutexLock(&queue->mutex);
int64_t memOfItems = queue->memOfItems; int64_t memOfItems = queue->memOfItems;
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
return memOfItems; return memOfItems;
} }
@ -198,19 +198,19 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
pNode->timestamp = taosGetTimestampUs(); pNode->timestamp = taosGetTimestampUs();
pNode->next = NULL; pNode->next = NULL;
taosThreadMutexLock(&queue->mutex); (void)taosThreadMutexLock(&queue->mutex);
if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) { if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) {
code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY; code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue, uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
queue->memLimit, tstrerror(code)); queue->memLimit, tstrerror(code));
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
return code; return code;
} else if (queue->itemLimit > 0 && queue->numOfItems + 1 > queue->itemLimit) { } else if (queue->itemLimit > 0 && queue->numOfItems + 1 > queue->itemLimit) {
code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY; code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue, uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
queue->itemLimit, tstrerror(code)); queue->itemLimit, tstrerror(code));
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
return code; return code;
} }
@ -229,7 +229,7 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems); uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
if (queue->qset) { if (queue->qset) {
tsem_post(&queue->qset->sem); tsem_post(&queue->qset->sem);
@ -241,7 +241,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int32_t code = 0; int32_t code = 0;
taosThreadMutexLock(&queue->mutex); (void)taosThreadMutexLock(&queue->mutex);
if (queue->head) { if (queue->head) {
pNode = queue->head; pNode = queue->head;
@ -260,7 +260,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
queue->memOfItems); queue->memOfItems);
} }
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
return code; return code;
} }
@ -279,7 +279,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
int32_t numOfItems = 0; int32_t numOfItems = 0;
bool empty; bool empty;
taosThreadMutexLock(&queue->mutex); (void)taosThreadMutexLock(&queue->mutex);
empty = queue->head == NULL; empty = queue->head == NULL;
if (!empty) { if (!empty) {
@ -305,7 +305,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
} }
} }
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
// if source queue is empty, we set destination qall to empty too. // if source queue is empty, we set destination qall to empty too.
if (empty) { if (empty) {
@ -355,7 +355,7 @@ void taosCloseQset(STaosQset *qset) {
if (qset == NULL) return; if (qset == NULL) return;
// remove all the queues from qset // remove all the queues from qset
taosThreadMutexLock(&qset->mutex); (void)taosThreadMutexLock(&qset->mutex);
while (qset->head) { while (qset->head) {
STaosQueue *queue = qset->head; STaosQueue *queue = qset->head;
qset->head = qset->head->next; qset->head = qset->head->next;
@ -363,7 +363,7 @@ void taosCloseQset(STaosQset *qset) {
queue->qset = NULL; queue->qset = NULL;
queue->next = NULL; queue->next = NULL;
} }
taosThreadMutexUnlock(&qset->mutex); (void)taosThreadMutexUnlock(&qset->mutex);
taosThreadMutexDestroy(&qset->mutex); taosThreadMutexDestroy(&qset->mutex);
tsem_destroy(&qset->sem); tsem_destroy(&qset->sem);
@ -382,19 +382,19 @@ void taosQsetThreadResume(STaosQset *qset) {
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) { int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
if (queue->qset) return -1; if (queue->qset) return -1;
taosThreadMutexLock(&qset->mutex); (void)taosThreadMutexLock(&qset->mutex);
queue->next = qset->head; queue->next = qset->head;
queue->ahandle = ahandle; queue->ahandle = ahandle;
qset->head = queue; qset->head = queue;
qset->numOfQueues++; qset->numOfQueues++;
taosThreadMutexLock(&queue->mutex); (void)taosThreadMutexLock(&queue->mutex);
atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems); atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
queue->qset = qset; queue->qset = qset;
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
taosThreadMutexUnlock(&qset->mutex); (void)taosThreadMutexUnlock(&qset->mutex);
uTrace("queue:%p is added into qset:%p", queue, qset); uTrace("queue:%p is added into qset:%p", queue, qset);
return 0; return 0;
@ -403,7 +403,7 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
STaosQueue *tqueue = NULL; STaosQueue *tqueue = NULL;
taosThreadMutexLock(&qset->mutex); (void)taosThreadMutexLock(&qset->mutex);
if (qset->head) { if (qset->head) {
if (qset->head == queue) { if (qset->head == queue) {
@ -427,15 +427,15 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
if (qset->current == queue) qset->current = tqueue->next; if (qset->current == queue) qset->current = tqueue->next;
qset->numOfQueues--; qset->numOfQueues--;
taosThreadMutexLock(&queue->mutex); (void)taosThreadMutexLock(&queue->mutex);
atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems); atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
queue->qset = NULL; queue->qset = NULL;
queue->next = NULL; queue->next = NULL;
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
} }
} }
taosThreadMutexUnlock(&qset->mutex); (void)taosThreadMutexUnlock(&qset->mutex);
uDebug("queue:%p is removed from qset:%p", queue, qset); uDebug("queue:%p is removed from qset:%p", queue, qset);
} }
@ -446,7 +446,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo)
tsem_wait(&qset->sem); tsem_wait(&qset->sem);
taosThreadMutexLock(&qset->mutex); (void)taosThreadMutexLock(&qset->mutex);
for (int32_t i = 0; i < qset->numOfQueues; ++i) { for (int32_t i = 0; i < qset->numOfQueues; ++i) {
if (qset->current == NULL) qset->current = qset->head; if (qset->current == NULL) qset->current = qset->head;
@ -455,7 +455,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo)
if (queue == NULL) break; if (queue == NULL) break;
if (queue->head == NULL) continue; if (queue->head == NULL) continue;
taosThreadMutexLock(&queue->mutex); (void)taosThreadMutexLock(&queue->mutex);
if (queue->head) { if (queue->head) {
pNode = queue->head; pNode = queue->head;
@ -475,11 +475,11 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo)
queue->memOfItems); queue->memOfItems);
} }
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
if (pNode) break; if (pNode) break;
} }
taosThreadMutexUnlock(&qset->mutex); (void)taosThreadMutexUnlock(&qset->mutex);
return code; return code;
} }
@ -489,7 +489,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
int32_t code = 0; int32_t code = 0;
tsem_wait(&qset->sem); tsem_wait(&qset->sem);
taosThreadMutexLock(&qset->mutex); (void)taosThreadMutexLock(&qset->mutex);
for (int32_t i = 0; i < qset->numOfQueues; ++i) { for (int32_t i = 0; i < qset->numOfQueues; ++i) {
if (qset->current == NULL) qset->current = qset->head; if (qset->current == NULL) qset->current = qset->head;
@ -498,7 +498,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
if (queue == NULL) break; if (queue == NULL) break;
if (queue->head == NULL) continue; if (queue->head == NULL) continue;
taosThreadMutexLock(&queue->mutex); (void)taosThreadMutexLock(&queue->mutex);
if (queue->head) { if (queue->head) {
qall->current = queue->head; qall->current = queue->head;
@ -526,12 +526,12 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
} }
} }
taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
if (code != 0) break; if (code != 0) break;
} }
taosThreadMutexUnlock(&qset->mutex); (void)taosThreadMutexUnlock(&qset->mutex);
return code; return code;
} }
@ -554,11 +554,11 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) {
if (pItem == NULL) return; if (pItem == NULL) return;
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
taosThreadMutexLock(&qset->mutex); (void)taosThreadMutexLock(&qset->mutex);
for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) { for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
tsem_post(&qset->sem); tsem_post(&qset->sem);
} }
taosThreadMutexUnlock(&qset->mutex); (void)taosThreadMutexUnlock(&qset->mutex);
} }
#endif #endif

View File

@ -76,7 +76,7 @@ int32_t taosOpenRef(int32_t max, RefFp fp) {
return terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno = TSDB_CODE_OUT_OF_MEMORY;
} }
taosThreadMutexLock(&tsRefMutex); (void)taosThreadMutexLock(&tsRefMutex);
for (i = 0; i < TSDB_REF_OBJECTS; ++i) { for (i = 0; i < TSDB_REF_OBJECTS; ++i) {
tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS; tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS;
@ -105,7 +105,7 @@ int32_t taosOpenRef(int32_t max, RefFp fp) {
uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum); uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
} }
taosThreadMutexUnlock(&tsRefMutex); (void)taosThreadMutexUnlock(&tsRefMutex);
return rsetId; return rsetId;
} }
@ -121,7 +121,7 @@ int32_t taosCloseRef(int32_t rsetId) {
pSet = tsRefSetList + rsetId; pSet = tsRefSetList + rsetId;
taosThreadMutexLock(&tsRefMutex); (void)taosThreadMutexLock(&tsRefMutex);
if (pSet->state == TSDB_REF_STATE_ACTIVE) { if (pSet->state == TSDB_REF_STATE_ACTIVE) {
pSet->state = TSDB_REF_STATE_DELETED; pSet->state = TSDB_REF_STATE_DELETED;
@ -131,7 +131,7 @@ int32_t taosCloseRef(int32_t rsetId) {
uTrace("rsetId:%d is already closed, count:%d", rsetId, pSet->count); uTrace("rsetId:%d is already closed, count:%d", rsetId, pSet->count);
} }
taosThreadMutexUnlock(&tsRefMutex); (void)taosThreadMutexUnlock(&tsRefMutex);
if (deleted) taosDecRsetCount(pSet); if (deleted) taosDecRsetCount(pSet);
@ -349,7 +349,7 @@ int32_t taosListRef() {
SRefNode *pNode; SRefNode *pNode;
int32_t num = 0; int32_t num = 0;
taosThreadMutexLock(&tsRefMutex); (void)taosThreadMutexLock(&tsRefMutex);
for (int32_t i = 0; i < TSDB_REF_OBJECTS; ++i) { for (int32_t i = 0; i < TSDB_REF_OBJECTS; ++i) {
pSet = tsRefSetList + i; pSet = tsRefSetList + i;
@ -369,7 +369,7 @@ int32_t taosListRef() {
} }
} }
taosThreadMutexUnlock(&tsRefMutex); (void)taosThreadMutexUnlock(&tsRefMutex);
return num; return num;
} }
@ -475,7 +475,7 @@ static void taosDecRsetCount(SRefSet *pSet) {
if (count > 0) return; if (count > 0) return;
taosThreadMutexLock(&tsRefMutex); (void)taosThreadMutexLock(&tsRefMutex);
if (pSet->state != TSDB_REF_STATE_EMPTY) { if (pSet->state != TSDB_REF_STATE_EMPTY) {
pSet->state = TSDB_REF_STATE_EMPTY; pSet->state = TSDB_REF_STATE_EMPTY;
@ -489,5 +489,5 @@ static void taosDecRsetCount(SRefSet *pSet) {
uTrace("rsetId:%d is cleaned, refSetNum:%d count:%d", pSet->rsetId, tsRefSetNum, pSet->count); uTrace("rsetId:%d is cleaned, refSetNum:%d count:%d", pSet->rsetId, tsRefSetNum, pSet->count);
} }
taosThreadMutexUnlock(&tsRefMutex); (void)taosThreadMutexUnlock(&tsRefMutex);
} }

View File

@ -230,7 +230,7 @@ static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
timer->prev = NULL; timer->prev = NULL;
timer->expireAt = taosGetMonotonicMs() + delay; timer->expireAt = taosGetMonotonicMs() + delay;
taosThreadMutexLock(&wheel->mutex); (void)taosThreadMutexLock(&wheel->mutex);
uint32_t idx = 0; uint32_t idx = 0;
if (timer->expireAt > wheel->nextScanAt) { if (timer->expireAt > wheel->nextScanAt) {
@ -248,7 +248,7 @@ static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
p->prev = timer; p->prev = timer;
} }
taosThreadMutexUnlock(&wheel->mutex); (void)taosThreadMutexUnlock(&wheel->mutex);
} }
static bool removeFromWheel(tmr_obj_t* timer) { static bool removeFromWheel(tmr_obj_t* timer) {
@ -259,7 +259,7 @@ static bool removeFromWheel(tmr_obj_t* timer) {
time_wheel_t* wheel = wheels + wheelIdx; time_wheel_t* wheel = wheels + wheelIdx;
bool removed = false; bool removed = false;
taosThreadMutexLock(&wheel->mutex); (void)taosThreadMutexLock(&wheel->mutex);
// other thread may modify timer->wheel, check again. // other thread may modify timer->wheel, check again.
if (timer->wheel < tListLen(wheels)) { if (timer->wheel < tListLen(wheels)) {
if (timer->prev != NULL) { if (timer->prev != NULL) {
@ -277,7 +277,7 @@ static bool removeFromWheel(tmr_obj_t* timer) {
timerDecRef(timer); timerDecRef(timer);
removed = true; removed = true;
} }
taosThreadMutexUnlock(&wheel->mutex); (void)taosThreadMutexUnlock(&wheel->mutex);
return removed; return removed;
} }
@ -372,7 +372,7 @@ static void taosTimerLoopFunc(int32_t signo) {
time_wheel_t* wheel = wheels + i; time_wheel_t* wheel = wheels + i;
while (now >= wheel->nextScanAt) { while (now >= wheel->nextScanAt) {
taosThreadMutexLock(&wheel->mutex); (void)taosThreadMutexLock(&wheel->mutex);
wheel->index = (wheel->index + 1) % wheel->size; wheel->index = (wheel->index + 1) % wheel->size;
tmr_obj_t* timer = wheel->slots[wheel->index]; tmr_obj_t* timer = wheel->slots[wheel->index];
while (timer != NULL) { while (timer != NULL) {
@ -407,7 +407,7 @@ static void taosTimerLoopFunc(int32_t signo) {
timer = next; timer = next;
} }
wheel->nextScanAt += wheel->resolution; wheel->nextScanAt += wheel->resolution;
taosThreadMutexUnlock(&wheel->mutex); (void)taosThreadMutexUnlock(&wheel->mutex);
} }
addToExpired(expired); addToExpired(expired);
@ -594,13 +594,13 @@ void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, con
return NULL; return NULL;
} }
taosThreadMutexLock(&tmrCtrlMutex); (void)taosThreadMutexLock(&tmrCtrlMutex);
tmr_ctrl_t* ctrl = unusedTmrCtrl; tmr_ctrl_t* ctrl = unusedTmrCtrl;
if (ctrl != NULL) { if (ctrl != NULL) {
unusedTmrCtrl = ctrl->next; unusedTmrCtrl = ctrl->next;
numOfTmrCtrl++; numOfTmrCtrl++;
} }
taosThreadMutexUnlock(&tmrCtrlMutex); (void)taosThreadMutexUnlock(&tmrCtrlMutex);
if (ctrl == NULL) { if (ctrl == NULL) {
tmrError("%s too many timer controllers, failed to create timer controller.", label); tmrError("%s too many timer controllers, failed to create timer controller.", label);
@ -623,11 +623,11 @@ void taosTmrCleanUp(void* handle) {
tmrDebug("%s timer controller is cleaned up.", ctrl->label); tmrDebug("%s timer controller is cleaned up.", ctrl->label);
ctrl->label[0] = 0; ctrl->label[0] = 0;
taosThreadMutexLock(&tmrCtrlMutex); (void)taosThreadMutexLock(&tmrCtrlMutex);
ctrl->next = unusedTmrCtrl; ctrl->next = unusedTmrCtrl;
numOfTmrCtrl--; numOfTmrCtrl--;
unusedTmrCtrl = ctrl; unusedTmrCtrl = ctrl;
taosThreadMutexUnlock(&tmrCtrlMutex); (void)taosThreadMutexUnlock(&tmrCtrlMutex);
tmrDebug("time controller's tmr ctrl size: %d", numOfTmrCtrl); tmrDebug("time controller's tmr ctrl size: %d", numOfTmrCtrl);
if (numOfTmrCtrl <= 0) { if (numOfTmrCtrl <= 0) {

View File

@ -120,7 +120,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
return NULL; return NULL;
} }
taosThreadMutexLock(&pool->mutex); (void)taosThreadMutexLock(&pool->mutex);
taosSetQueueFp(queue, fp, NULL); taosSetQueueFp(queue, fp, NULL);
taosAddIntoQset(pool->qset, queue, ahandle); taosAddIntoQset(pool->qset, queue, ahandle);
@ -146,7 +146,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
} while (pool->num < pool->min); } while (pool->num < pool->min);
} }
taosThreadMutexUnlock(&pool->mutex); (void)taosThreadMutexUnlock(&pool->mutex);
uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
return queue; return queue;
@ -251,7 +251,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
return NULL; return NULL;
} }
taosThreadMutexLock(&pool->mutex); (void)taosThreadMutexLock(&pool->mutex);
taosSetQueueFp(queue, fp, NULL); taosSetQueueFp(queue, fp, NULL);
taosAddIntoQset(pool->qset, queue, ahandle); taosAddIntoQset(pool->qset, queue, ahandle);
@ -267,7 +267,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
uError("worker:%s:%d failed to create", pool->name, curWorkerNum); uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
taosMemoryFree(worker); taosMemoryFree(worker);
taosCloseQueue(queue); taosCloseQueue(queue);
taosThreadMutexUnlock(&pool->mutex); (void)taosThreadMutexUnlock(&pool->mutex);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
@ -294,7 +294,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
curWorkerNum++; curWorkerNum++;
} }
taosThreadMutexUnlock(&pool->mutex); (void)taosThreadMutexUnlock(&pool->mutex);
uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
return queue; return queue;
@ -393,7 +393,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
} }
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
taosThreadMutexLock(&pool->mutex); (void)taosThreadMutexLock(&pool->mutex);
SWWorker *worker = pool->workers + pool->nextId; SWWorker *worker = pool->workers + pool->nextId;
int32_t code = -1; int32_t code = -1;
STaosQueue *queue; STaosQueue *queue;
@ -427,7 +427,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
} }
_OVER: _OVER:
taosThreadMutexUnlock(&pool->mutex); (void)taosThreadMutexUnlock(&pool->mutex);
if (code) { if (code) {
if (queue != NULL) taosCloseQueue(queue); if (queue != NULL) taosCloseQueue(queue);
@ -675,9 +675,9 @@ static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) {
while (waiting > 0) { while (waiting > 0) {
int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1); int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1);
if (waitingNew == waiting) { if (waitingNew == waiting) {
taosThreadMutexLock(&pPool->waitingAfterBlockLock); (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
taosThreadCondSignal(&pPool->waitingAfterBlockCond); taosThreadCondSignal(&pPool->waitingAfterBlockCond);
taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
ret = true; ret = true;
break; break;
} }
@ -693,9 +693,9 @@ static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void *p) {
while (waiting > 0) { while (waiting > 0) {
int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1); int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1);
if (waitingNew == waiting) { if (waitingNew == waiting) {
taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond); taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond);
taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
ret = true; ret = true;
break; break;
} }
@ -731,18 +731,18 @@ static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) {
} }
} }
// to wait for process // to wait for process
taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1); atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1);
if (!pPool->exit) taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock); if (!pPool->exit) taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock);
// recovered from waiting // recovered from waiting
taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) { bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) {
if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(pPool) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(pPool) || if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(pPool) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(pPool) ||
tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) { tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) {
taosThreadMutexLock(&pPool->poolLock); (void)taosThreadMutexLock(&pPool->poolLock);
SListNode *pNode = listNode(pWorker); SListNode *pNode = listNode(pWorker);
tdListPopNode(pPool->workers, pNode); tdListPopNode(pPool->workers, pNode);
// reclaim some workers // reclaim some workers
@ -757,29 +757,29 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ
taosMemoryFree(head); taosMemoryFree(head);
} }
tdListAppendNode(pPool->exitedWorkers, pNode); tdListAppendNode(pPool->exitedWorkers, pNode);
taosThreadMutexUnlock(&pPool->poolLock); (void)taosThreadMutexUnlock(&pPool->poolLock);
return false; return false;
} }
// put back to backup pool // put back to backup pool
tdListAppendNode(pPool->backupWorkers, pNode); tdListAppendNode(pPool->backupWorkers, pNode);
taosThreadMutexUnlock(&pPool->poolLock); (void)taosThreadMutexUnlock(&pPool->poolLock);
// start to wait at backup cond // start to wait at backup cond
taosThreadMutexLock(&pPool->backupLock); (void)taosThreadMutexLock(&pPool->backupLock);
atomic_fetch_add_32(&pPool->backupNum, 1); atomic_fetch_add_32(&pPool->backupNum, 1);
if (!pPool->exit) taosThreadCondWait(&pPool->backupCond, &pPool->backupLock); if (!pPool->exit) taosThreadCondWait(&pPool->backupCond, &pPool->backupLock);
taosThreadMutexUnlock(&pPool->backupLock); (void)taosThreadMutexUnlock(&pPool->backupLock);
// recovered from backup // recovered from backup
taosThreadMutexLock(&pPool->poolLock); (void)taosThreadMutexLock(&pPool->poolLock);
if (pPool->exit) { if (pPool->exit) {
taosThreadMutexUnlock(&pPool->poolLock); (void)taosThreadMutexUnlock(&pPool->poolLock);
return false; return false;
} }
tdListPopNode(pPool->backupWorkers, pNode); tdListPopNode(pPool->backupWorkers, pNode);
tdListAppendNode(pPool->workers, pNode); tdListAppendNode(pPool->workers, pNode);
taosThreadMutexUnlock(&pPool->poolLock); (void)taosThreadMutexUnlock(&pPool->poolLock);
return true; return true;
} else { } else {
@ -819,7 +819,7 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
} }
void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
taosThreadMutexLock(&pPool->poolLock); (void)taosThreadMutexLock(&pPool->poolLock);
pPool->exit = true; pPool->exit = true;
int32_t size = listNEles(pPool->workers); int32_t size = listNEles(pPool->workers);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
@ -829,31 +829,31 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
taosQsetThreadResume(pPool->qset); taosQsetThreadResume(pPool->qset);
} }
taosThreadMutexUnlock(&pPool->poolLock); (void)taosThreadMutexUnlock(&pPool->poolLock);
taosThreadMutexLock(&pPool->backupLock); (void)taosThreadMutexLock(&pPool->backupLock);
taosThreadCondBroadcast(&pPool->backupCond); taosThreadCondBroadcast(&pPool->backupCond);
taosThreadMutexUnlock(&pPool->backupLock); (void)taosThreadMutexUnlock(&pPool->backupLock);
taosThreadMutexLock(&pPool->waitingAfterBlockLock); (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
taosThreadCondBroadcast(&pPool->waitingAfterBlockCond); taosThreadCondBroadcast(&pPool->waitingAfterBlockCond);
taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
taosThreadCondBroadcast(&pPool->waitingBeforeProcessMsgCond); taosThreadCondBroadcast(&pPool->waitingBeforeProcessMsgCond);
taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
int32_t idx = 0; int32_t idx = 0;
SQueryAutoQWorker *worker = NULL; SQueryAutoQWorker *worker = NULL;
while (true) { while (true) {
taosThreadMutexLock(&pPool->poolLock); (void)taosThreadMutexLock(&pPool->poolLock);
if (listNEles(pPool->workers) == 0) { if (listNEles(pPool->workers) == 0) {
taosThreadMutexUnlock(&pPool->poolLock); (void)taosThreadMutexUnlock(&pPool->poolLock);
break; break;
} }
SListNode *pNode = tdListPopHead(pPool->workers); SListNode *pNode = tdListPopHead(pPool->workers);
worker = (SQueryAutoQWorker *)pNode->data; worker = (SQueryAutoQWorker *)pNode->data;
taosThreadMutexUnlock(&pPool->poolLock); (void)taosThreadMutexUnlock(&pPool->poolLock);
if (worker && taosCheckPthreadValid(worker->thread)) { if (worker && taosCheckPthreadValid(worker->thread)) {
taosThreadJoin(worker->thread, NULL); taosThreadJoin(worker->thread, NULL);
taosThreadClear(&worker->thread); taosThreadClear(&worker->thread);
@ -905,7 +905,7 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand
return NULL; return NULL;
} }
taosThreadMutexLock(&pool->poolLock); (void)taosThreadMutexLock(&pool->poolLock);
taosSetQueueFp(queue, fp, NULL); taosSetQueueFp(queue, fp, NULL);
taosAddIntoQset(pool->qset, queue, ahandle); taosAddIntoQset(pool->qset, queue, ahandle);
SQueryAutoQWorker worker = {0}; SQueryAutoQWorker worker = {0};
@ -944,7 +944,7 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand
} while (pool->num < pool->min); } while (pool->num < pool->min);
} }
taosThreadMutexUnlock(&pool->poolLock); (void)taosThreadMutexUnlock(&pool->poolLock);
uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
return queue; return queue;
@ -968,15 +968,15 @@ static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool) {
SQueryAutoQWorker worker = {0}; SQueryAutoQWorker worker = {0};
worker.pool = pool; worker.pool = pool;
worker.backupIdx = -1; worker.backupIdx = -1;
taosThreadMutexLock(&pool->poolLock); (void)taosThreadMutexLock(&pool->poolLock);
worker.id = listNEles(pool->workers); worker.id = listNEles(pool->workers);
SListNode *pNode = tdListAdd(pool->workers, &worker); SListNode *pNode = tdListAdd(pool->workers, &worker);
if (!pNode) { if (!pNode) {
taosThreadMutexUnlock(&pool->poolLock); (void)taosThreadMutexUnlock(&pool->poolLock);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return terrno;
} }
taosThreadMutexUnlock(&pool->poolLock); (void)taosThreadMutexUnlock(&pool->poolLock);
pWorker = (SQueryAutoQWorker *)pNode->data; pWorker = (SQueryAutoQWorker *)pNode->data;
TdThreadAttr thAttr; TdThreadAttr thAttr;
@ -1015,10 +1015,10 @@ static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
taosThreadMutexLock(&pPool->waitingAfterBlockLock); (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1); atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1);
if (!pPool->exit) taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock); if (!pPool->exit) taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock);
taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
if (pPool->exit) return TSDB_CODE_QRY_QWORKER_QUIT; if (pPool->exit) return TSDB_CODE_QRY_QWORKER_QUIT;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }