Merge pull request #26812 from taosdata/enh/TD-30987-20
enh: refactor return code
This commit is contained in:
commit
5e51e4149a
|
@ -260,7 +260,7 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pDnode->mutex);
|
||||
(void)taosThreadMutexLock(&pDnode->mutex);
|
||||
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
||||
|
||||
dInfo("node:%s, start to create", pWrapper->name);
|
||||
|
@ -277,7 +277,7 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
pWrapper->required = true;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pDnode->mutex);
|
||||
(void)taosThreadMutexUnlock(&pDnode->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -316,7 +316,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
|
||||
dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pWrapper->name);
|
||||
|
||||
taosThreadMutexLock(&pDnode->mutex);
|
||||
(void)taosThreadMutexLock(&pDnode->mutex);
|
||||
|
||||
dInfo("node:%s, stopping node", pWrapper->name);
|
||||
dmStopNode(pWrapper);
|
||||
|
@ -325,7 +325,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
|
||||
pWrapper = &pDnode->wrappers[ntype];
|
||||
if (taosMkDir(pWrapper->path) != 0) {
|
||||
taosThreadMutexUnlock(&pDnode->mutex);
|
||||
(void)taosThreadMutexUnlock(&pDnode->mutex);
|
||||
code = terrno;
|
||||
dError("failed to create dir:%s since %s", pWrapper->path, tstrerror(code));
|
||||
return code;
|
||||
|
@ -347,7 +347,7 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
pWrapper->required = true;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pDnode->mutex);
|
||||
(void)taosThreadMutexUnlock(&pDnode->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -375,7 +375,7 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
return terrno = code;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pDnode->mutex);
|
||||
(void)taosThreadMutexLock(&pDnode->mutex);
|
||||
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
||||
|
||||
dInfo("node:%s, start to drop", pWrapper->name);
|
||||
|
@ -395,7 +395,7 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
dmCloseNode(pWrapper);
|
||||
taosRemoveDir(pWrapper->path);
|
||||
}
|
||||
taosThreadMutexUnlock(&pDnode->mutex);
|
||||
(void)taosThreadMutexUnlock(&pDnode->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -530,12 +530,12 @@ int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pK
|
|||
uint64_t key[4];
|
||||
initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
|
||||
|
||||
taosThreadMutexLock(pLock);
|
||||
(void)taosThreadMutexLock(pLock);
|
||||
pMeta->pCache->sTagFilterResCache.accTimes += 1;
|
||||
|
||||
LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
|
||||
if (pHandle == NULL) {
|
||||
taosThreadMutexUnlock(pLock);
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -565,7 +565,7 @@ int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pK
|
|||
taosLRUCacheRelease(pCache, pHandle, false);
|
||||
|
||||
// unlock meta
|
||||
taosThreadMutexUnlock(pLock);
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -652,7 +652,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
|
|||
uint64_t key[4] = {0};
|
||||
initCacheKey(key, pTableEntry, suid, pKey, keyLen);
|
||||
|
||||
taosThreadMutexLock(pLock);
|
||||
(void)taosThreadMutexLock(pLock);
|
||||
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
|
||||
if (pEntry == NULL) {
|
||||
code = addNewEntry(pTableEntry, pKey, keyLen, suid);
|
||||
|
@ -668,7 +668,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
|
|||
uint64_t* p = (uint64_t*)pNode->data;
|
||||
if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
|
||||
// we have already found the existed items, no need to added to cache anymore.
|
||||
taosThreadMutexUnlock(pLock);
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else { // not equal, append it
|
||||
tdListAppend(&(*pEntry)->list, pKey);
|
||||
|
@ -680,7 +680,7 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
|
|||
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
_end:
|
||||
taosThreadMutexUnlock(pLock);
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
|
||||
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
|
||||
|
||||
|
@ -697,11 +697,11 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
|
|||
initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
|
||||
|
||||
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
|
||||
taosThreadMutexLock(pLock);
|
||||
(void)taosThreadMutexLock(pLock);
|
||||
|
||||
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
|
||||
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
|
||||
taosThreadMutexUnlock(pLock);
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -717,7 +717,7 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
|
|||
}
|
||||
|
||||
tdListEmpty(&(*pEntry)->list);
|
||||
taosThreadMutexUnlock(pLock);
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
|
||||
metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -736,12 +736,12 @@ int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, i
|
|||
uint64_t key[4];
|
||||
initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
|
||||
|
||||
taosThreadMutexLock(pLock);
|
||||
(void)taosThreadMutexLock(pLock);
|
||||
pMeta->pCache->STbGroupResCache.accTimes += 1;
|
||||
|
||||
LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
|
||||
if (pHandle == NULL) {
|
||||
taosThreadMutexUnlock(pLock);
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -764,7 +764,7 @@ int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, i
|
|||
taosLRUCacheRelease(pCache, pHandle, false);
|
||||
|
||||
// unlock meta
|
||||
taosThreadMutexUnlock(pLock);
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -829,7 +829,7 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
|
|||
uint64_t key[4] = {0};
|
||||
initCacheKey(key, pTableEntry, suid, pKey, keyLen);
|
||||
|
||||
taosThreadMutexLock(pLock);
|
||||
(void)taosThreadMutexLock(pLock);
|
||||
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
|
||||
if (pEntry == NULL) {
|
||||
code = addNewEntry(pTableEntry, pKey, keyLen, suid);
|
||||
|
@ -845,7 +845,7 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
|
|||
uint64_t* p = (uint64_t*)pNode->data;
|
||||
if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
|
||||
// we have already found the existed items, no need to added to cache anymore.
|
||||
taosThreadMutexUnlock(pLock);
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else { // not equal, append it
|
||||
tdListAppend(&(*pEntry)->list, pKey);
|
||||
|
@ -857,7 +857,7 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
|
|||
taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW, NULL);
|
||||
_end:
|
||||
taosThreadMutexUnlock(pLock);
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
|
||||
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
|
||||
|
||||
|
@ -874,11 +874,11 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
|
|||
initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
|
||||
|
||||
TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
|
||||
taosThreadMutexLock(pLock);
|
||||
(void)taosThreadMutexLock(pLock);
|
||||
|
||||
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
|
||||
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
|
||||
taosThreadMutexUnlock(pLock);
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -894,7 +894,7 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
|
|||
}
|
||||
|
||||
tdListEmpty(&(*pEntry)->list);
|
||||
taosThreadMutexUnlock(pLock);
|
||||
(void)taosThreadMutexUnlock(pLock);
|
||||
|
||||
metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -540,7 +540,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&tsdb->mutex);
|
||||
|
||||
// scan tomb data
|
||||
if (tsdb->imem->nDel > 0) {
|
||||
|
@ -572,7 +572,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
|
|||
} else {
|
||||
hasDataToCommit = true;
|
||||
if ((code = tsdbCommitInfoAdd(tsdb, fset->fid))) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
break;
|
||||
|
@ -593,13 +593,13 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
|
|||
if (fset) {
|
||||
code = tsdbTFileSetInitCopy(tsdb, fset, &info->fset);
|
||||
if (code) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -667,11 +667,11 @@ _exit:
|
|||
}
|
||||
|
||||
int32_t tsdbPreCommit(STsdb *tsdb) {
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&tsdb->mutex);
|
||||
ASSERT_CORE(tsdb->imem == NULL, "imem should be null to commit mem");
|
||||
tsdb->imem = tsdb->mem;
|
||||
tsdb->mem = NULL;
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -686,9 +686,9 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
|
|||
int64_t nDel = imem->nDel;
|
||||
|
||||
if (nRow == 0 && nDel == 0) {
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&tsdb->mutex);
|
||||
tsdb->imem = NULL;
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
tsdbUnrefMemTable(imem, NULL, true);
|
||||
} else {
|
||||
SCommitter2 committer = {0};
|
||||
|
@ -719,10 +719,10 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
|
|||
if (tsdb->imem) {
|
||||
SMemTable *pMemTable = tsdb->imem;
|
||||
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&tsdb->mutex);
|
||||
|
||||
if ((code = tsdbFSEditCommit(tsdb->pFS))) {
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
tsdb->imem = NULL;
|
||||
|
@ -734,7 +734,7 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
|
||||
tsdbCommitInfoDestroy(tsdb);
|
||||
tsdbUnrefMemTable(pMemTable, NULL, true);
|
||||
|
@ -757,14 +757,14 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) {
|
|||
|
||||
TAOS_CHECK_GOTO(tsdbFSEditAbort(pTsdb->pFS), &lino, _exit);
|
||||
|
||||
taosThreadMutexLock(&pTsdb->mutex);
|
||||
(void)taosThreadMutexLock(&pTsdb->mutex);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) {
|
||||
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i);
|
||||
if (info->fset) {
|
||||
tsdbFinishTaskOnFileSet(pTsdb, info->fid);
|
||||
}
|
||||
}
|
||||
taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
tsdbCommitInfoDestroy(pTsdb);
|
||||
|
||||
_exit:
|
||||
|
|
|
@ -761,7 +761,7 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pTsdb->mutex);
|
||||
(void)taosThreadMutexLock(&pTsdb->mutex);
|
||||
|
||||
// disable
|
||||
pTsdb->bgTaskDisabled = true;
|
||||
|
@ -772,7 +772,7 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
|
|||
if (fset->channelOpened) {
|
||||
if (taosArrayPush(channelArray, &fset->channel) == NULL) {
|
||||
taosArrayDestroy(channelArray);
|
||||
taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
return terrno;
|
||||
}
|
||||
fset->channel = (SVAChannelID){0};
|
||||
|
@ -782,7 +782,7 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
|
||||
// destroy all channels
|
||||
for (int32_t i = 0; i < taosArrayGetSize(channelArray); i++) {
|
||||
|
@ -798,9 +798,9 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
|
|||
}
|
||||
|
||||
int32_t tsdbEnableBgTask(STsdb *pTsdb) {
|
||||
taosThreadMutexLock(&pTsdb->mutex);
|
||||
(void)taosThreadMutexLock(&pTsdb->mutex);
|
||||
pTsdb->bgTaskDisabled = false;
|
||||
taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -814,16 +814,16 @@ int32_t tsdbCloseFS(STFileSystem **fs) {
|
|||
}
|
||||
|
||||
int64_t tsdbFSAllocEid(STFileSystem *fs) {
|
||||
taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
int64_t cid = ++fs->neid;
|
||||
taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
return cid;
|
||||
}
|
||||
|
||||
void tsdbFSUpdateEid(STFileSystem *fs, int64_t cid) {
|
||||
taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
fs->neid = TMAX(fs->neid, cid);
|
||||
taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
}
|
||||
|
||||
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
|
||||
|
@ -871,7 +871,7 @@ static int32_t tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
|
|||
}
|
||||
|
||||
int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&tsdb->mutex);
|
||||
STFileSet *fset;
|
||||
tsdbFSGetFSet(tsdb->pFS, fid, &fset);
|
||||
if (fset) {
|
||||
|
@ -881,7 +881,7 @@ int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
|
|||
fset->numWaitCommit--;
|
||||
}
|
||||
}
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -973,7 +973,7 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
|
|||
|
||||
TARRAY2_INIT(fsetArr[0]);
|
||||
|
||||
taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||
code = tsdbTFileSetInitCopy(fs->tsdb, fset, &fset1);
|
||||
if (code) break;
|
||||
|
@ -981,7 +981,7 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
|
|||
code = TARRAY2_APPEND(fsetArr[0], fset1);
|
||||
if (code) break;
|
||||
}
|
||||
taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
|
||||
if (code) {
|
||||
TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
|
||||
|
@ -1001,9 +1001,9 @@ int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
|
|||
}
|
||||
|
||||
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
|
||||
taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr);
|
||||
taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1075,7 +1075,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRa
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||
int64_t ever = VERSION_MAX;
|
||||
if (pHash) {
|
||||
|
@ -1092,7 +1092,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRa
|
|||
code = TARRAY2_APPEND(fsetArr[0], fset1);
|
||||
if (code) break;
|
||||
}
|
||||
taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
|
||||
_out:
|
||||
if (code) {
|
||||
|
@ -1131,7 +1131,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||
int64_t sver1 = sver;
|
||||
int64_t ever1 = ever;
|
||||
|
@ -1160,7 +1160,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
|
|||
|
||||
fsr1 = NULL;
|
||||
}
|
||||
taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
|
||||
if (code) {
|
||||
tsdbTFileSetRangeClear(&fsr1);
|
||||
|
|
|
@ -230,11 +230,11 @@ int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
taosThreadMutexInit(&fobj[0]->mutex, NULL);
|
||||
(void)taosThreadMutexInit(&fobj[0]->mutex, NULL);
|
||||
fobj[0]->f[0] = f[0];
|
||||
fobj[0]->state = TSDB_FSTATE_LIVE;
|
||||
fobj[0]->ref = 1;
|
||||
tsdbTFileName(pTsdb, f, fobj[0]->fname);
|
||||
(void)tsdbTFileName(pTsdb, f, fobj[0]->fname);
|
||||
// fobj[0]->nlevel = tfsGetLevel(pTsdb->pVnode->pTfs);
|
||||
fobj[0]->nlevel = vnodeNodeId(pTsdb->pVnode);
|
||||
return 0;
|
||||
|
@ -242,18 +242,18 @@ int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj) {
|
|||
|
||||
int32_t tsdbTFileObjRef(STFileObj *fobj) {
|
||||
int32_t nRef;
|
||||
taosThreadMutexLock(&fobj->mutex);
|
||||
(void)(void)taosThreadMutexLock(&fobj->mutex);
|
||||
ASSERT(fobj->ref > 0 && fobj->state == TSDB_FSTATE_LIVE);
|
||||
nRef = ++fobj->ref;
|
||||
taosThreadMutexUnlock(&fobj->mutex);
|
||||
(void)(void)taosThreadMutexUnlock(&fobj->mutex);
|
||||
tsdbTrace("ref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbTFileObjUnref(STFileObj *fobj) {
|
||||
taosThreadMutexLock(&fobj->mutex);
|
||||
(void)(void)taosThreadMutexLock(&fobj->mutex);
|
||||
int32_t nRef = --fobj->ref;
|
||||
taosThreadMutexUnlock(&fobj->mutex);
|
||||
(void)(void)taosThreadMutexUnlock(&fobj->mutex);
|
||||
ASSERT(nRef >= 0);
|
||||
tsdbTrace("unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
|
||||
if (nRef == 0) {
|
||||
|
@ -318,11 +318,11 @@ static void tsdbTFileObjRemoveLC(STFileObj *fobj, bool remove_all) {
|
|||
}
|
||||
|
||||
int32_t tsdbTFileObjRemove(STFileObj *fobj) {
|
||||
taosThreadMutexLock(&fobj->mutex);
|
||||
(void)taosThreadMutexLock(&fobj->mutex);
|
||||
ASSERT(fobj->state == TSDB_FSTATE_LIVE && fobj->ref > 0);
|
||||
fobj->state = TSDB_FSTATE_DEAD;
|
||||
int32_t nRef = --fobj->ref;
|
||||
taosThreadMutexUnlock(&fobj->mutex);
|
||||
(void)taosThreadMutexUnlock(&fobj->mutex);
|
||||
tsdbTrace("remove unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
|
||||
if (nRef == 0) {
|
||||
tsdbTFileObjRemoveLC(fobj, true);
|
||||
|
@ -332,11 +332,11 @@ int32_t tsdbTFileObjRemove(STFileObj *fobj) {
|
|||
}
|
||||
|
||||
int32_t tsdbTFileObjRemoveUpdateLC(STFileObj *fobj) {
|
||||
taosThreadMutexLock(&fobj->mutex);
|
||||
(void)taosThreadMutexLock(&fobj->mutex);
|
||||
ASSERT(fobj->state == TSDB_FSTATE_LIVE && fobj->ref > 0);
|
||||
fobj->state = TSDB_FSTATE_DEAD;
|
||||
int32_t nRef = --fobj->ref;
|
||||
taosThreadMutexUnlock(&fobj->mutex);
|
||||
(void)taosThreadMutexUnlock(&fobj->mutex);
|
||||
tsdbTrace("remove unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
|
||||
if (nRef == 0) {
|
||||
tsdbTFileObjRemoveLC(fobj, false);
|
||||
|
|
|
@ -375,13 +375,13 @@ static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
|
|||
// edit file system
|
||||
TAOS_CHECK_GOTO(tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE), &lino, _exit);
|
||||
|
||||
taosThreadMutexLock(&merger->tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&merger->tsdb->mutex);
|
||||
code = tsdbFSEditCommit(merger->tsdb->pFS);
|
||||
if (code) {
|
||||
taosThreadMutexUnlock(&merger->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&merger->tsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
taosThreadMutexUnlock(&merger->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&merger->tsdb->mutex);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -478,10 +478,10 @@ _exit:
|
|||
static int32_t tsdbMergeGetFSet(SMerger *merger) {
|
||||
STFileSet *fset;
|
||||
|
||||
taosThreadMutexLock(&merger->tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&merger->tsdb->mutex);
|
||||
tsdbFSGetFSet(merger->tsdb->pFS, merger->fid, &fset);
|
||||
if (fset == NULL) {
|
||||
taosThreadMutexUnlock(&merger->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&merger->tsdb->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -489,10 +489,10 @@ static int32_t tsdbMergeGetFSet(SMerger *merger) {
|
|||
|
||||
int32_t code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset);
|
||||
if (code) {
|
||||
taosThreadMutexUnlock(&merger->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&merger->tsdb->mutex);
|
||||
return code;
|
||||
}
|
||||
taosThreadMutexUnlock(&merger->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&merger->tsdb->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -105,10 +105,10 @@ int32_t tsdbClose(STsdb **pTsdb) {
|
|||
tsdbDebug("vgId:%d, tsdb is close at %s, days:%d, keep:%d,%d,%d, keepTimeOffset:%d", TD_VID(pdb->pVnode), pdb->path,
|
||||
pdb->keepCfg.days, pdb->keepCfg.keep0, pdb->keepCfg.keep1, pdb->keepCfg.keep2,
|
||||
pdb->keepCfg.keepTimeOffset);
|
||||
taosThreadMutexLock(&(*pTsdb)->mutex);
|
||||
(void)taosThreadMutexLock(&(*pTsdb)->mutex);
|
||||
tsdbMemTableDestroy((*pTsdb)->mem, true);
|
||||
(*pTsdb)->mem = NULL;
|
||||
taosThreadMutexUnlock(&(*pTsdb)->mutex);
|
||||
(void)taosThreadMutexUnlock(&(*pTsdb)->mutex);
|
||||
|
||||
tsdbCloseFS(&(*pTsdb)->pFS);
|
||||
tsdbCloseCache(*pTsdb);
|
||||
|
|
|
@ -209,15 +209,15 @@ static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
|
|||
if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
|
||||
TAOS_CHECK_GOTO(tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, TSDB_FEDIT_RETENTION), &lino, _exit);
|
||||
|
||||
taosThreadMutexLock(&rtner->tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&rtner->tsdb->mutex);
|
||||
|
||||
code = tsdbFSEditCommit(rtner->tsdb->pFS);
|
||||
if (code) {
|
||||
taosThreadMutexUnlock(&rtner->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&rtner->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
|
||||
|
||||
TARRAY2_DESTROY(&rtner->fopArr, NULL);
|
||||
}
|
||||
|
@ -315,13 +315,13 @@ static int32_t tsdbRetention(void *arg) {
|
|||
};
|
||||
|
||||
// begin task
|
||||
taosThreadMutexLock(&pTsdb->mutex);
|
||||
(void)taosThreadMutexLock(&pTsdb->mutex);
|
||||
tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, &fset);
|
||||
if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) {
|
||||
taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
|
||||
// do retention
|
||||
if (rtner.fset) {
|
||||
|
@ -336,9 +336,9 @@ static int32_t tsdbRetention(void *arg) {
|
|||
|
||||
_exit:
|
||||
if (rtner.fset) {
|
||||
taosThreadMutexLock(&pTsdb->mutex);
|
||||
(void)taosThreadMutexLock(&pTsdb->mutex);
|
||||
tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid);
|
||||
taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&pTsdb->mutex);
|
||||
}
|
||||
|
||||
// clear resources
|
||||
|
@ -387,9 +387,9 @@ _exit:
|
|||
|
||||
int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now) {
|
||||
int32_t code = 0;
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&tsdb->mutex);
|
||||
code = tsdbAsyncRetentionImpl(tsdb, now, false);
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -735,9 +735,9 @@ int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&tsdb->mutex);
|
||||
code = tsdbAsyncRetentionImpl(tsdb, now, true);
|
||||
taosThreadMutexUnlock(&tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
|
||||
if (code) {
|
||||
tsdbError("vgId:%d, %s failed, reason:%s", TD_VID(tsdb->pVnode), __func__, tstrerror(code));
|
||||
|
|
|
@ -351,7 +351,7 @@ static STsdbFSetPartList* tsdbSnapGetFSetPartList(STFileSystem* fs) {
|
|||
}
|
||||
|
||||
int32_t code = 0;
|
||||
taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&fs->tsdb->mutex);
|
||||
STFileSet* fset;
|
||||
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||
STsdbFSetPartition* pItem = NULL;
|
||||
|
@ -364,7 +364,7 @@ static STsdbFSetPartList* tsdbSnapGetFSetPartList(STFileSystem* fs) {
|
|||
code = TARRAY2_SORT_INSERT(pList, pItem, tsdbFSetPartCmprFn);
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
|
||||
if (code) {
|
||||
TARRAY2_DESTROY(pList, tsdbFSetPartitionClear);
|
||||
|
|
|
@ -1127,17 +1127,17 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
|
|||
code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
taosThreadMutexLock(&writer[0]->tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&writer[0]->tsdb->mutex);
|
||||
|
||||
code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
|
||||
if (code) {
|
||||
taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
|
||||
|
||||
taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
|
||||
}
|
||||
|
||||
tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger);
|
||||
|
|
|
@ -485,17 +485,17 @@ int32_t tsdbSnapRAWWriterClose(STsdbSnapRAWWriter** writer, int8_t rollback) {
|
|||
code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
taosThreadMutexLock(&writer[0]->tsdb->mutex);
|
||||
(void)taosThreadMutexLock(&writer[0]->tsdb->mutex);
|
||||
|
||||
code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
|
||||
if (code) {
|
||||
taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
|
||||
|
||||
taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
|
||||
(void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
|
||||
}
|
||||
|
||||
TARRAY2_DESTROY(writer[0]->fopArr, NULL);
|
||||
|
|
|
@ -213,7 +213,7 @@ static void *vnodeAsyncLoop(void *arg) {
|
|||
setThreadName(async->label);
|
||||
|
||||
for (;;) {
|
||||
taosThreadMutexLock(&async->mutex);
|
||||
(void)taosThreadMutexLock(&async->mutex);
|
||||
|
||||
// finish last running task
|
||||
if (worker->runningTask != NULL) {
|
||||
|
@ -228,7 +228,7 @@ static void *vnodeAsyncLoop(void *arg) {
|
|||
}
|
||||
worker->state = EVA_WORKER_STATE_STOP;
|
||||
async->numLaunchWorkers--;
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
|
@ -268,7 +268,7 @@ static void *vnodeAsyncLoop(void *arg) {
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
|
||||
// do run the task
|
||||
worker->runningTask->execute(worker->runningTask->arg);
|
||||
|
@ -387,16 +387,16 @@ static int32_t vnodeAsyncDestroy(SVAsync **async) {
|
|||
}
|
||||
|
||||
// set stop and broadcast
|
||||
taosThreadMutexLock(&(*async)->mutex);
|
||||
(void)taosThreadMutexLock(&(*async)->mutex);
|
||||
(*async)->stop = true;
|
||||
taosThreadCondBroadcast(&(*async)->hasTask);
|
||||
taosThreadMutexUnlock(&(*async)->mutex);
|
||||
(void)taosThreadMutexUnlock(&(*async)->mutex);
|
||||
|
||||
// join all workers
|
||||
for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) {
|
||||
taosThreadMutexLock(&(*async)->mutex);
|
||||
(void)taosThreadMutexLock(&(*async)->mutex);
|
||||
EVWorkerState state = (*async)->workers[i].state;
|
||||
taosThreadMutexUnlock(&(*async)->mutex);
|
||||
(void)taosThreadMutexUnlock(&(*async)->mutex);
|
||||
|
||||
if (state == EVA_WORKER_STATE_UINIT) {
|
||||
continue;
|
||||
|
@ -504,7 +504,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
|
|||
taosThreadCondInit(&task->waitCond, NULL);
|
||||
|
||||
// schedule task
|
||||
taosThreadMutexLock(&async->mutex);
|
||||
(void)taosThreadMutexLock(&async->mutex);
|
||||
|
||||
if (channelID->id == 0) {
|
||||
task->channel = NULL;
|
||||
|
@ -514,7 +514,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
|
|||
};
|
||||
vHashGet(async->channelTable, &channel, (void **)&task->channel);
|
||||
if (task->channel == NULL) {
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
taosThreadCondDestroy(&task->waitCond);
|
||||
taosMemoryFree(task);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
@ -526,7 +526,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
|
|||
// add task to hash table
|
||||
int32_t ret = vHashPut(async->taskTable, task);
|
||||
if (ret != 0) {
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
taosThreadCondDestroy(&task->waitCond);
|
||||
taosMemoryFree(task);
|
||||
return ret;
|
||||
|
@ -580,7 +580,7 @@ int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*exec
|
|||
task->prev->next = task;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
|
||||
if (taskID != NULL) {
|
||||
taskID->async = channelID->async;
|
||||
|
@ -601,7 +601,7 @@ int32_t vnodeAWait(SVATaskID *taskID) {
|
|||
.taskId = taskID->id,
|
||||
};
|
||||
|
||||
taosThreadMutexLock(&async->mutex);
|
||||
(void)taosThreadMutexLock(&async->mutex);
|
||||
|
||||
vHashGet(async->taskTable, &task2, (void **)&task);
|
||||
if (task) {
|
||||
|
@ -615,7 +615,7 @@ int32_t vnodeAWait(SVATaskID *taskID) {
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -634,7 +634,7 @@ int32_t vnodeACancel(SVATaskID *taskID) {
|
|||
void (*cancel)(void *) = NULL;
|
||||
void *arg = NULL;
|
||||
|
||||
taosThreadMutexLock(&async->mutex);
|
||||
(void)taosThreadMutexLock(&async->mutex);
|
||||
|
||||
vHashGet(async->taskTable, &task2, (void **)&task);
|
||||
if (task) {
|
||||
|
@ -649,7 +649,7 @@ int32_t vnodeACancel(SVATaskID *taskID) {
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
|
||||
if (cancel) {
|
||||
cancel(arg);
|
||||
|
@ -663,12 +663,12 @@ int32_t vnodeAsyncSetWorkers(int64_t asyncID, int32_t numWorkers) {
|
|||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
SVAsync *async = vnodeAsyncs[asyncID];
|
||||
taosThreadMutexLock(&async->mutex);
|
||||
(void)taosThreadMutexLock(&async->mutex);
|
||||
async->numWorkers = numWorkers;
|
||||
if (async->numIdleWorkers > 0) {
|
||||
taosThreadCondBroadcast(&async->hasTask);
|
||||
}
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -693,14 +693,14 @@ int32_t vnodeAChannelInit(int64_t asyncID, SVAChannelID *channelID) {
|
|||
channel->scheduled = NULL;
|
||||
|
||||
// register channel
|
||||
taosThreadMutexLock(&async->mutex);
|
||||
(void)taosThreadMutexLock(&async->mutex);
|
||||
|
||||
channel->channelId = channelID->id = ++async->nextChannelId;
|
||||
|
||||
// add to hash table
|
||||
int32_t ret = vHashPut(async->channelTable, channel);
|
||||
if (ret != 0) {
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
taosMemoryFree(channel);
|
||||
return ret;
|
||||
}
|
||||
|
@ -713,7 +713,7 @@ int32_t vnodeAChannelInit(int64_t asyncID, SVAChannelID *channelID) {
|
|||
|
||||
async->numChannels++;
|
||||
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
|
||||
channelID->async = asyncID;
|
||||
return 0;
|
||||
|
@ -734,7 +734,7 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&async->mutex);
|
||||
(void)taosThreadMutexLock(&async->mutex);
|
||||
|
||||
vHashGet(async->channelTable, &channel2, (void **)&channel);
|
||||
if (channel) {
|
||||
|
@ -793,7 +793,7 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&async->mutex);
|
||||
(void)taosThreadMutexUnlock(&async->mutex);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
|
||||
SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
|
||||
cancel->cancel(cancel->arg);
|
||||
|
|
|
@ -242,7 +242,9 @@ void vnodeBufPoolUnRef(SVBufPool *pPool, bool proactive) {
|
|||
|
||||
SVnode *pVnode = pPool->pVnode;
|
||||
|
||||
if (proactive) taosThreadMutexLock(&pVnode->mutex);
|
||||
if (proactive) {
|
||||
(void)taosThreadMutexLock(&pVnode->mutex);
|
||||
}
|
||||
|
||||
if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit;
|
||||
|
||||
|
@ -267,12 +269,14 @@ void vnodeBufPoolUnRef(SVBufPool *pPool, bool proactive) {
|
|||
vnodeBufPoolAddToFreeList(pPool);
|
||||
|
||||
_exit:
|
||||
if (proactive) taosThreadMutexUnlock(&pVnode->mutex);
|
||||
if (proactive) {
|
||||
(void)taosThreadMutexUnlock(&pVnode->mutex);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
|
||||
taosThreadMutexLock(&pPool->mutex);
|
||||
(void)taosThreadMutexLock(&pPool->mutex);
|
||||
|
||||
pQNode->pNext = pPool->qList.pNext;
|
||||
pQNode->ppNext = &pPool->qList.pNext;
|
||||
|
@ -280,20 +284,24 @@ int32_t vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
|
|||
pPool->qList.pNext = pQNode;
|
||||
pPool->nQuery++;
|
||||
|
||||
taosThreadMutexUnlock(&pPool->mutex);
|
||||
(void)taosThreadMutexUnlock(&pPool->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (proactive) taosThreadMutexLock(&pPool->mutex);
|
||||
if (proactive) {
|
||||
(void)taosThreadMutexLock(&pPool->mutex);
|
||||
}
|
||||
|
||||
pQNode->pNext->ppNext = pQNode->ppNext;
|
||||
*pQNode->ppNext = pQNode->pNext;
|
||||
pPool->nQuery--;
|
||||
|
||||
if (proactive) taosThreadMutexUnlock(&pPool->mutex);
|
||||
if (proactive) {
|
||||
(void)taosThreadMutexUnlock(&pPool->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
|
||||
|
@ -303,7 +311,7 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
|
|||
|
||||
vDebug("vgId:%d, recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id);
|
||||
|
||||
taosThreadMutexLock(&pPool->mutex);
|
||||
(void)taosThreadMutexLock(&pPool->mutex);
|
||||
|
||||
SQueryNode *pNode = pPool->qList.pNext;
|
||||
while (pNode != &pPool->qList) {
|
||||
|
@ -319,6 +327,6 @@ int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
|
|||
}
|
||||
|
||||
_exit:
|
||||
taosThreadMutexUnlock(&pPool->mutex);
|
||||
(void)taosThreadMutexUnlock(&pPool->mutex);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
taosThreadMutexLock(&pVnode->mutex);
|
||||
(void)taosThreadMutexLock(&pVnode->mutex);
|
||||
|
||||
int32_t nTry = 0;
|
||||
for (;;) {
|
||||
|
@ -110,7 +110,7 @@ static int32_t vnodeGetBufPoolToUse(SVnode *pVnode) {
|
|||
}
|
||||
|
||||
_exit:
|
||||
taosThreadMutexUnlock(&pVnode->mutex);
|
||||
(void)taosThreadMutexUnlock(&pVnode->mutex);
|
||||
if (code) {
|
||||
vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
|
@ -150,7 +150,7 @@ int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
|
|||
bool diskAvail = osDataSpaceAvailable();
|
||||
bool needCommit = false;
|
||||
|
||||
taosThreadMutexLock(&pVnode->mutex);
|
||||
(void)taosThreadMutexLock(&pVnode->mutex);
|
||||
if (pVnode->inUse && diskAvail) {
|
||||
needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) ||
|
||||
(atExit && (pVnode->inUse->size > 0 || pVnode->pMeta->changed ||
|
||||
|
@ -162,7 +162,7 @@ int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
|
|||
TD_VID(pVnode), needCommit, diskAvail, pVnode->inUse ? pVnode->inUse->size : 0,
|
||||
pVnode->inUse ? pVnode->inUse->node.size : 0, pVnode->pMeta->changed, pVnode->state.applied,
|
||||
pVnode->state.committed);
|
||||
taosThreadMutexUnlock(&pVnode->mutex);
|
||||
(void)taosThreadMutexUnlock(&pVnode->mutex);
|
||||
return needCommit;
|
||||
}
|
||||
|
||||
|
@ -299,11 +299,11 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
|||
code = smaPrepareAsyncCommit(pVnode->pSma);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
taosThreadMutexLock(&pVnode->mutex);
|
||||
(void)taosThreadMutexLock(&pVnode->mutex);
|
||||
ASSERT(pVnode->onCommit == NULL);
|
||||
pVnode->onCommit = pVnode->inUse;
|
||||
pVnode->inUse = NULL;
|
||||
taosThreadMutexUnlock(&pVnode->mutex);
|
||||
(void)taosThreadMutexUnlock(&pVnode->mutex);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -316,7 +316,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
static void vnodeReturnBufPool(SVnode *pVnode) {
|
||||
taosThreadMutexLock(&pVnode->mutex);
|
||||
(void)taosThreadMutexLock(&pVnode->mutex);
|
||||
|
||||
SVBufPool *pPool = pVnode->onCommit;
|
||||
int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
|
||||
|
@ -340,7 +340,7 @@ static void vnodeReturnBufPool(SVnode *pVnode) {
|
|||
ASSERT(0);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pVnode->mutex);
|
||||
(void)taosThreadMutexUnlock(&pVnode->mutex);
|
||||
}
|
||||
static int32_t vnodeCommit(void *arg) {
|
||||
int32_t code = 0;
|
||||
|
|
|
@ -34,7 +34,7 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
|||
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
||||
if (vnodeIsMsgBlock(pMsg->msgType)) {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
taosThreadMutexLock(&pVnode->lock);
|
||||
(void)taosThreadMutexLock(&pVnode->lock);
|
||||
if (pVnode->blocked) {
|
||||
vGTrace("vgId:%d, msg:%p post block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
|
||||
TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
|
||||
|
@ -43,7 +43,7 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
|||
pVnode->blockSeq = 0;
|
||||
tsem_post(&pVnode->syncSem);
|
||||
}
|
||||
taosThreadMutexUnlock(&pVnode->lock);
|
||||
(void)taosThreadMutexUnlock(&pVnode->lock);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -113,7 +113,7 @@ static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code)
|
|||
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
|
||||
int64_t seq = 0;
|
||||
|
||||
taosThreadMutexLock(&pVnode->lock);
|
||||
(void)taosThreadMutexLock(&pVnode->lock);
|
||||
int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
|
||||
bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
|
||||
if (wait) {
|
||||
|
@ -122,7 +122,7 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak
|
|||
pVnode->blockSec = taosGetTimestampSec();
|
||||
pVnode->blockSeq = seq;
|
||||
}
|
||||
taosThreadMutexUnlock(&pVnode->lock);
|
||||
(void)taosThreadMutexUnlock(&pVnode->lock);
|
||||
|
||||
if (code > 0) {
|
||||
vnodeHandleWriteMsg(pVnode, pMsg);
|
||||
|
@ -171,14 +171,14 @@ static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool
|
|||
if (*arrSize <= 0) return;
|
||||
SRpcMsg *pLastMsg = pMsgArr[*arrSize - 1];
|
||||
|
||||
taosThreadMutexLock(&pVnode->lock);
|
||||
(void)taosThreadMutexLock(&pVnode->lock);
|
||||
int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
|
||||
bool wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType));
|
||||
if (wait) {
|
||||
ASSERT(!pVnode->blocked);
|
||||
pVnode->blocked = true;
|
||||
}
|
||||
taosThreadMutexUnlock(&pVnode->lock);
|
||||
(void)taosThreadMutexUnlock(&pVnode->lock);
|
||||
|
||||
if (code > 0) {
|
||||
for (int32_t i = 0; i < *arrSize; ++i) {
|
||||
|
@ -598,13 +598,13 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
|||
SVnode *pVnode = pFsm->data;
|
||||
vInfo("vgId:%d, become follower", pVnode->config.vgId);
|
||||
|
||||
taosThreadMutexLock(&pVnode->lock);
|
||||
(void)taosThreadMutexLock(&pVnode->lock);
|
||||
if (pVnode->blocked) {
|
||||
pVnode->blocked = false;
|
||||
vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
|
||||
tsem_post(&pVnode->syncSem);
|
||||
}
|
||||
taosThreadMutexUnlock(&pVnode->lock);
|
||||
(void)taosThreadMutexUnlock(&pVnode->lock);
|
||||
|
||||
if (pVnode->pTq) {
|
||||
tqUpdateNodeStage(pVnode->pTq, false);
|
||||
|
@ -616,13 +616,13 @@ static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
|
|||
SVnode *pVnode = pFsm->data;
|
||||
vInfo("vgId:%d, become learner", pVnode->config.vgId);
|
||||
|
||||
taosThreadMutexLock(&pVnode->lock);
|
||||
(void)taosThreadMutexLock(&pVnode->lock);
|
||||
if (pVnode->blocked) {
|
||||
pVnode->blocked = false;
|
||||
vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
|
||||
tsem_post(&pVnode->syncSem);
|
||||
}
|
||||
taosThreadMutexUnlock(&pVnode->lock);
|
||||
(void)taosThreadMutexUnlock(&pVnode->lock);
|
||||
}
|
||||
|
||||
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
|
||||
|
@ -746,13 +746,13 @@ void vnodeSyncPreClose(SVnode *pVnode) {
|
|||
syncLeaderTransfer(pVnode->sync);
|
||||
syncPreStop(pVnode->sync);
|
||||
|
||||
taosThreadMutexLock(&pVnode->lock);
|
||||
(void)taosThreadMutexLock(&pVnode->lock);
|
||||
if (pVnode->blocked) {
|
||||
vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
|
||||
pVnode->blocked = false;
|
||||
tsem_post(&pVnode->syncSem);
|
||||
}
|
||||
taosThreadMutexUnlock(&pVnode->lock);
|
||||
(void)taosThreadMutexUnlock(&pVnode->lock);
|
||||
}
|
||||
|
||||
void vnodeSyncPostClose(SVnode *pVnode) {
|
||||
|
@ -767,7 +767,7 @@ void vnodeSyncClose(SVnode *pVnode) {
|
|||
|
||||
void vnodeSyncCheckTimeout(SVnode *pVnode) {
|
||||
vTrace("vgId:%d, check sync timeout msg", pVnode->config.vgId);
|
||||
taosThreadMutexLock(&pVnode->lock);
|
||||
(void)taosThreadMutexLock(&pVnode->lock);
|
||||
if (pVnode->blocked) {
|
||||
int32_t curSec = taosGetTimestampSec();
|
||||
int32_t delta = curSec - pVnode->blockSec;
|
||||
|
@ -788,7 +788,7 @@ void vnodeSyncCheckTimeout(SVnode *pVnode) {
|
|||
tsem_post(&pVnode->syncSem);
|
||||
}
|
||||
}
|
||||
taosThreadMutexUnlock(&pVnode->lock);
|
||||
(void)taosThreadMutexUnlock(&pVnode->lock);
|
||||
}
|
||||
|
||||
bool vnodeIsRoleLeader(SVnode *pVnode) {
|
||||
|
|
|
@ -28,7 +28,7 @@ char *tsMonSlowLogUri = "/slow-sql-detail-batch";
|
|||
char *tsMonFwBasicUri = "/taosd-cluster-basic";
|
||||
|
||||
void monRecordLog(int64_t ts, ELogLevel level, const char *content) {
|
||||
taosThreadMutexLock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexLock(&tsMonitor.lock);
|
||||
int32_t size = taosArrayGetSize(tsMonitor.logs);
|
||||
if (size < tsMonitor.cfg.maxLogs) {
|
||||
SMonLogItem item = {.ts = ts, .level = level};
|
||||
|
@ -37,11 +37,11 @@ void monRecordLog(int64_t ts, ELogLevel level, const char *content) {
|
|||
tstrncpy(pItem->content, content, MON_LOG_LEN);
|
||||
}
|
||||
}
|
||||
taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
}
|
||||
|
||||
int32_t monGetLogs(SMonLogs *logs) {
|
||||
taosThreadMutexLock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexLock(&tsMonitor.lock);
|
||||
logs->logs = taosArrayDup(tsMonitor.logs, NULL);
|
||||
logs->numOfInfoLogs = tsNumOfInfoLogs;
|
||||
logs->numOfErrorLogs = tsNumOfErrorLogs;
|
||||
|
@ -52,7 +52,7 @@ int32_t monGetLogs(SMonLogs *logs) {
|
|||
tsNumOfDebugLogs = 0;
|
||||
tsNumOfTraceLogs = 0;
|
||||
taosArrayClear(tsMonitor.logs);
|
||||
taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
if (logs->logs == NULL) {
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
@ -60,44 +60,44 @@ int32_t monGetLogs(SMonLogs *logs) {
|
|||
}
|
||||
|
||||
void monSetDmInfo(SMonDmInfo *pInfo) {
|
||||
taosThreadMutexLock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexLock(&tsMonitor.lock);
|
||||
memcpy(&tsMonitor.dmInfo, pInfo, sizeof(SMonDmInfo));
|
||||
taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
memset(pInfo, 0, sizeof(SMonDmInfo));
|
||||
}
|
||||
|
||||
void monSetMmInfo(SMonMmInfo *pInfo) {
|
||||
taosThreadMutexLock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexLock(&tsMonitor.lock);
|
||||
memcpy(&tsMonitor.mmInfo, pInfo, sizeof(SMonMmInfo));
|
||||
taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
memset(pInfo, 0, sizeof(SMonMmInfo));
|
||||
}
|
||||
|
||||
void monSetVmInfo(SMonVmInfo *pInfo) {
|
||||
taosThreadMutexLock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexLock(&tsMonitor.lock);
|
||||
memcpy(&tsMonitor.vmInfo, pInfo, sizeof(SMonVmInfo));
|
||||
taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
memset(pInfo, 0, sizeof(SMonVmInfo));
|
||||
}
|
||||
|
||||
void monSetQmInfo(SMonQmInfo *pInfo) {
|
||||
taosThreadMutexLock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexLock(&tsMonitor.lock);
|
||||
memcpy(&tsMonitor.qmInfo, pInfo, sizeof(SMonQmInfo));
|
||||
taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
memset(pInfo, 0, sizeof(SMonQmInfo));
|
||||
}
|
||||
|
||||
void monSetSmInfo(SMonSmInfo *pInfo) {
|
||||
taosThreadMutexLock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexLock(&tsMonitor.lock);
|
||||
memcpy(&tsMonitor.smInfo, pInfo, sizeof(SMonSmInfo));
|
||||
taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
memset(pInfo, 0, sizeof(SMonSmInfo));
|
||||
}
|
||||
|
||||
void monSetBmInfo(SMonBmInfo *pInfo) {
|
||||
taosThreadMutexLock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexLock(&tsMonitor.lock);
|
||||
memcpy(&tsMonitor.bmInfo, pInfo, sizeof(SMonBmInfo));
|
||||
taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
memset(pInfo, 0, sizeof(SMonBmInfo));
|
||||
}
|
||||
|
||||
|
@ -153,7 +153,7 @@ static SMonInfo *monCreateMonitorInfo() {
|
|||
|
||||
monGetLogs(&pMonitor->log);
|
||||
|
||||
taosThreadMutexLock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexLock(&tsMonitor.lock);
|
||||
memcpy(&pMonitor->dmInfo, &tsMonitor.dmInfo, sizeof(SMonDmInfo));
|
||||
memcpy(&pMonitor->mmInfo, &tsMonitor.mmInfo, sizeof(SMonMmInfo));
|
||||
memcpy(&pMonitor->vmInfo, &tsMonitor.vmInfo, sizeof(SMonVmInfo));
|
||||
|
@ -166,7 +166,7 @@ static SMonInfo *monCreateMonitorInfo() {
|
|||
memset(&tsMonitor.smInfo, 0, sizeof(SMonSmInfo));
|
||||
memset(&tsMonitor.qmInfo, 0, sizeof(SMonQmInfo));
|
||||
memset(&tsMonitor.bmInfo, 0, sizeof(SMonBmInfo));
|
||||
taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
(void)taosThreadMutexUnlock(&tsMonitor.lock);
|
||||
|
||||
pMonitor->pJson = tjsonCreateObject();
|
||||
if (pMonitor->pJson == NULL || pMonitor->log.logs == NULL) {
|
||||
|
|
|
@ -76,9 +76,9 @@ static void httpHandleReq(SHttpMsg* msg);
|
|||
static void httpHandleQuit(SHttpMsg* msg);
|
||||
static int32_t httpSendQuit(SHttpModule* http, int64_t chanId);
|
||||
|
||||
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg);
|
||||
static void httpDestroyMsg(SHttpMsg* msg);
|
||||
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg);
|
||||
static void httpDestroyMsg(SHttpMsg* msg);
|
||||
|
||||
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port);
|
||||
static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ);
|
||||
|
@ -91,27 +91,27 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri,
|
|||
|
||||
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen,
|
||||
|
||||
EHttpCompFlag flag) {
|
||||
EHttpCompFlag flag) {
|
||||
int32_t code = 0;
|
||||
int32_t len = 0;
|
||||
if (flag == HTTP_FLAT) {
|
||||
len = snprintf(pHead, headLen,
|
||||
"POST %s HTTP/1.1\n"
|
||||
"Host: %s\n"
|
||||
"Content-Type: application/json\n"
|
||||
"Content-Length: %d\n\n",
|
||||
uri, server, contLen);
|
||||
"POST %s HTTP/1.1\n"
|
||||
"Host: %s\n"
|
||||
"Content-Type: application/json\n"
|
||||
"Content-Length: %d\n\n",
|
||||
uri, server, contLen);
|
||||
if (len < 0 || len >= headLen) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
}
|
||||
} else if (flag == HTTP_GZIP) {
|
||||
len = snprintf(pHead, headLen,
|
||||
"POST %s HTTP/1.1\n"
|
||||
"Host: %s\n"
|
||||
"Content-Type: application/json\n"
|
||||
"Content-Encoding: gzip\n"
|
||||
"Content-Length: %d\n\n",
|
||||
uri, server, contLen);
|
||||
"POST %s HTTP/1.1\n"
|
||||
"Host: %s\n"
|
||||
"Content-Type: application/json\n"
|
||||
"Content-Encoding: gzip\n"
|
||||
"Content-Length: %d\n\n",
|
||||
uri, server, contLen);
|
||||
if (len < 0 || len >= headLen) {
|
||||
code = TSDB_CODE_OUT_OF_RANGE;
|
||||
}
|
||||
|
@ -127,7 +127,7 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {
|
|||
void* pDest = taosMemoryMalloc(destLen);
|
||||
|
||||
if (pDest == NULL) {
|
||||
code= TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -191,7 +191,7 @@ _OVER:
|
|||
|
||||
static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
|
||||
uint32_t ip = 0;
|
||||
int32_t code = taosGetIpv4FromFqdn(server, &ip);
|
||||
int32_t code = taosGetIpv4FromFqdn(server, &ip);
|
||||
if (code) {
|
||||
tError("http-report failed to resolving domain names: %s", server);
|
||||
return TSDB_CODE_RPC_FQDN_ERROR;
|
||||
|
@ -293,7 +293,7 @@ static void httpAsyncCb(uv_async_t* handle) {
|
|||
static int32_t BATCH_SIZE = 5;
|
||||
int32_t count = 0;
|
||||
|
||||
taosThreadMutexLock(&item->mtx);
|
||||
(void)taosThreadMutexLock(&item->mtx);
|
||||
httpMayDiscardMsg(http, item);
|
||||
|
||||
while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BATCH_SIZE) {
|
||||
|
@ -301,7 +301,7 @@ static void httpAsyncCb(uv_async_t* handle) {
|
|||
QUEUE_REMOVE(h);
|
||||
QUEUE_PUSH(&wq, h);
|
||||
}
|
||||
taosThreadMutexUnlock(&item->mtx);
|
||||
(void)taosThreadMutexUnlock(&item->mtx);
|
||||
|
||||
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||
queue* h = QUEUE_HEAD(&wq);
|
||||
|
@ -636,8 +636,8 @@ void httpModuleDestroy2(SHttpModule* http) {
|
|||
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
|
||||
int32_t contLen, EHttpCompFlag flag, int64_t chanId) {
|
||||
SHttpModule* load = NULL;
|
||||
SHttpMsg *msg = NULL;
|
||||
int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId,&msg);
|
||||
SHttpMsg* msg = NULL;
|
||||
int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, &msg);
|
||||
if (code != 0) {
|
||||
goto _ERROR;
|
||||
}
|
||||
|
|
|
@ -112,7 +112,7 @@ static FORCE_INLINE void __trashcan_wr_lock(SCacheObj *pCacheObj) {
|
|||
#if defined(LINUX)
|
||||
taosThreadRwlockWrlock(&pCacheObj->lock);
|
||||
#else
|
||||
taosThreadMutexLock(&pCacheObj->lock);
|
||||
(void)taosThreadMutexLock(&pCacheObj->lock);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -120,7 +120,7 @@ static FORCE_INLINE void __trashcan_unlock(SCacheObj *pCacheObj) {
|
|||
#if defined(LINUX)
|
||||
taosThreadRwlockUnlock(&pCacheObj->lock);
|
||||
#else
|
||||
taosThreadMutexUnlock(&pCacheObj->lock);
|
||||
(void)taosThreadMutexUnlock(&pCacheObj->lock);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -168,9 +168,9 @@ static void doInitRefreshThread(void) {
|
|||
TdThread doRegisterCacheObj(SCacheObj *pCacheObj) {
|
||||
taosThreadOnce(&cacheThreadInit, doInitRefreshThread);
|
||||
|
||||
taosThreadMutexLock(&guard);
|
||||
(void)taosThreadMutexLock(&guard);
|
||||
(void)taosArrayPush(pCacheArrayList, &pCacheObj);
|
||||
taosThreadMutexUnlock(&guard);
|
||||
(void)taosThreadMutexUnlock(&guard);
|
||||
|
||||
return cacheRefreshWorker;
|
||||
}
|
||||
|
@ -840,19 +840,19 @@ void *taosCacheTimedRefresh(void *handle) {
|
|||
goto _end;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&guard);
|
||||
(void)taosThreadMutexLock(&guard);
|
||||
size_t size = taosArrayGetSize(pCacheArrayList);
|
||||
taosThreadMutexUnlock(&guard);
|
||||
(void)taosThreadMutexUnlock(&guard);
|
||||
|
||||
count += 1;
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
taosThreadMutexLock(&guard);
|
||||
(void)taosThreadMutexLock(&guard);
|
||||
SCacheObj *pCacheObj = taosArrayGetP(pCacheArrayList, i);
|
||||
|
||||
if (pCacheObj == NULL) {
|
||||
uError("object is destroyed. ignore and try next");
|
||||
taosThreadMutexUnlock(&guard);
|
||||
(void)taosThreadMutexUnlock(&guard);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -864,11 +864,11 @@ void *taosCacheTimedRefresh(void *handle) {
|
|||
uDebug("%s is destroying, remove it from refresh list, remain cache obj:%" PRIzu, pCacheObj->name, size);
|
||||
pCacheObj->deleting = 0; // reset the deleting flag to enable pCacheObj to continue releasing resources.
|
||||
|
||||
taosThreadMutexUnlock(&guard);
|
||||
(void)taosThreadMutexUnlock(&guard);
|
||||
continue;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&guard);
|
||||
(void)taosThreadMutexUnlock(&guard);
|
||||
|
||||
if ((count % pCacheObj->checkTick) != 0) {
|
||||
continue;
|
||||
|
|
|
@ -44,7 +44,7 @@ int32_t taosAllocateId(id_pool_t *pIdPool) {
|
|||
if (pIdPool == NULL) return -1;
|
||||
|
||||
int32_t slot = -1;
|
||||
taosThreadMutexLock(&pIdPool->mutex);
|
||||
(void)taosThreadMutexLock(&pIdPool->mutex);
|
||||
|
||||
if (pIdPool->numOfFree > 0) {
|
||||
for (int32_t i = 0; i < pIdPool->maxId; ++i) {
|
||||
|
@ -58,14 +58,14 @@ int32_t taosAllocateId(id_pool_t *pIdPool) {
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pIdPool->mutex);
|
||||
(void)taosThreadMutexUnlock(&pIdPool->mutex);
|
||||
return slot + 1;
|
||||
}
|
||||
|
||||
void taosFreeId(id_pool_t *pIdPool, int32_t id) {
|
||||
if (pIdPool == NULL) return;
|
||||
|
||||
taosThreadMutexLock(&pIdPool->mutex);
|
||||
(void)taosThreadMutexLock(&pIdPool->mutex);
|
||||
|
||||
int32_t slot = (id - 1) % pIdPool->maxId;
|
||||
if (pIdPool->freeList[slot]) {
|
||||
|
@ -73,7 +73,7 @@ void taosFreeId(id_pool_t *pIdPool, int32_t id) {
|
|||
pIdPool->numOfFree++;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pIdPool->mutex);
|
||||
(void)taosThreadMutexUnlock(&pIdPool->mutex);
|
||||
}
|
||||
|
||||
void taosIdPoolCleanUp(id_pool_t *pIdPool) {
|
||||
|
@ -91,16 +91,16 @@ void taosIdPoolCleanUp(id_pool_t *pIdPool) {
|
|||
}
|
||||
|
||||
int32_t taosIdPoolNumOfUsed(id_pool_t *pIdPool) {
|
||||
taosThreadMutexLock(&pIdPool->mutex);
|
||||
(void)taosThreadMutexLock(&pIdPool->mutex);
|
||||
int32_t ret = pIdPool->maxId - pIdPool->numOfFree;
|
||||
taosThreadMutexUnlock(&pIdPool->mutex);
|
||||
(void)taosThreadMutexUnlock(&pIdPool->mutex);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool taosIdPoolMarkStatus(id_pool_t *pIdPool, int32_t id) {
|
||||
bool ret = false;
|
||||
taosThreadMutexLock(&pIdPool->mutex);
|
||||
(void)taosThreadMutexLock(&pIdPool->mutex);
|
||||
|
||||
int32_t slot = (id - 1) % pIdPool->maxId;
|
||||
if (!pIdPool->freeList[slot]) {
|
||||
|
@ -111,7 +111,7 @@ bool taosIdPoolMarkStatus(id_pool_t *pIdPool, int32_t id) {
|
|||
ret = false;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pIdPool->mutex);
|
||||
(void)taosThreadMutexUnlock(&pIdPool->mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -125,7 +125,7 @@ int32_t taosUpdateIdPool(id_pool_t *pIdPool, int32_t maxId) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pIdPool->mutex);
|
||||
(void)taosThreadMutexLock(&pIdPool->mutex);
|
||||
|
||||
memcpy(idList, pIdPool->freeList, sizeof(bool) * pIdPool->maxId);
|
||||
pIdPool->numOfFree += (maxId - pIdPool->maxId);
|
||||
|
@ -135,15 +135,15 @@ int32_t taosUpdateIdPool(id_pool_t *pIdPool, int32_t maxId) {
|
|||
pIdPool->freeList = idList;
|
||||
taosMemoryFree(oldIdList);
|
||||
|
||||
taosThreadMutexUnlock(&pIdPool->mutex);
|
||||
(void)taosThreadMutexUnlock(&pIdPool->mutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t taosIdPoolMaxSize(id_pool_t *pIdPool) {
|
||||
taosThreadMutexLock(&pIdPool->mutex);
|
||||
(void)taosThreadMutexLock(&pIdPool->mutex);
|
||||
int32_t ret = pIdPool->maxId;
|
||||
taosThreadMutexUnlock(&pIdPool->mutex);
|
||||
(void)taosThreadMutexUnlock(&pIdPool->mutex);
|
||||
|
||||
return ret;
|
||||
}
|
|
@ -362,7 +362,7 @@ static void *taosThreadToCloseOldFile(void *param) {
|
|||
}
|
||||
|
||||
static int32_t taosOpenNewLogFile() {
|
||||
taosThreadMutexLock(&tsLogObj.logMutex);
|
||||
(void)taosThreadMutexLock(&tsLogObj.logMutex);
|
||||
|
||||
if (tsLogObj.lines > tsNumOfLogLines && tsLogObj.openInProgress == 0) {
|
||||
tsLogObj.openInProgress = 1;
|
||||
|
@ -378,7 +378,7 @@ static int32_t taosOpenNewLogFile() {
|
|||
taosThreadAttrDestroy(&attr);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&tsLogObj.logMutex);
|
||||
(void)taosThreadMutexUnlock(&tsLogObj.logMutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -719,7 +719,7 @@ static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msg
|
|||
|
||||
if (pLogBuf == NULL || pLogBuf->stop) return -1;
|
||||
|
||||
taosThreadMutexLock(&LOG_BUF_MUTEX(pLogBuf));
|
||||
(void)taosThreadMutexLock(&LOG_BUF_MUTEX(pLogBuf));
|
||||
start = LOG_BUF_START(pLogBuf);
|
||||
end = LOG_BUF_END(pLogBuf);
|
||||
|
||||
|
@ -733,7 +733,7 @@ static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msg
|
|||
if (remainSize <= msgLen || ((lostLine > 0) && (remainSize <= (msgLen + tmpBufLen)))) {
|
||||
lostLine++;
|
||||
tsAsyncLogLostLines++;
|
||||
taosThreadMutexUnlock(&LOG_BUF_MUTEX(pLogBuf));
|
||||
(void)taosThreadMutexUnlock(&LOG_BUF_MUTEX(pLogBuf));
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -754,7 +754,7 @@ static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msg
|
|||
}
|
||||
*/
|
||||
|
||||
taosThreadMutexUnlock(&LOG_BUF_MUTEX(pLogBuf));
|
||||
(void)taosThreadMutexUnlock(&LOG_BUF_MUTEX(pLogBuf));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -86,11 +86,11 @@ void taosCloseQueue(STaosQueue *queue) {
|
|||
STaosQnode *pTemp;
|
||||
STaosQset *qset;
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
(void)taosThreadMutexLock(&queue->mutex);
|
||||
STaosQnode *pNode = queue->head;
|
||||
queue->head = NULL;
|
||||
qset = queue->qset;
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
|
||||
if (queue->qset) {
|
||||
taosRemoveFromQset(qset, queue);
|
||||
|
@ -112,11 +112,11 @@ bool taosQueueEmpty(STaosQueue *queue) {
|
|||
if (queue == NULL) return true;
|
||||
|
||||
bool empty = false;
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
(void)taosThreadMutexLock(&queue->mutex);
|
||||
if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) {
|
||||
empty = true;
|
||||
}
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
|
||||
return empty;
|
||||
}
|
||||
|
@ -124,26 +124,26 @@ bool taosQueueEmpty(STaosQueue *queue) {
|
|||
void taosUpdateItemSize(STaosQueue *queue, int32_t items) {
|
||||
if (queue == NULL) return;
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
(void)taosThreadMutexLock(&queue->mutex);
|
||||
queue->numOfItems -= items;
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
}
|
||||
|
||||
int32_t taosQueueItemSize(STaosQueue *queue) {
|
||||
if (queue == NULL) return 0;
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
(void)taosThreadMutexLock(&queue->mutex);
|
||||
int32_t numOfItems = queue->numOfItems;
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
|
||||
uTrace("queue:%p, numOfItems:%d memOfItems:%" PRId64, queue, queue->numOfItems, queue->memOfItems);
|
||||
return numOfItems;
|
||||
}
|
||||
|
||||
int64_t taosQueueMemorySize(STaosQueue *queue) {
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
(void)taosThreadMutexLock(&queue->mutex);
|
||||
int64_t memOfItems = queue->memOfItems;
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
return memOfItems;
|
||||
}
|
||||
|
||||
|
@ -198,19 +198,19 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
|
|||
pNode->timestamp = taosGetTimestampUs();
|
||||
pNode->next = NULL;
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
(void)taosThreadMutexLock(&queue->mutex);
|
||||
if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) {
|
||||
code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
|
||||
uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
|
||||
queue->memLimit, tstrerror(code));
|
||||
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
return code;
|
||||
} else if (queue->itemLimit > 0 && queue->numOfItems + 1 > queue->itemLimit) {
|
||||
code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
|
||||
uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
|
||||
queue->itemLimit, tstrerror(code));
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -229,7 +229,7 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
|
|||
|
||||
uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
|
||||
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
|
||||
if (queue->qset) {
|
||||
tsem_post(&queue->qset->sem);
|
||||
|
@ -241,7 +241,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
|
|||
STaosQnode *pNode = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
(void)taosThreadMutexLock(&queue->mutex);
|
||||
|
||||
if (queue->head) {
|
||||
pNode = queue->head;
|
||||
|
@ -260,7 +260,7 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
|
|||
queue->memOfItems);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -279,7 +279,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
|
|||
int32_t numOfItems = 0;
|
||||
bool empty;
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
(void)taosThreadMutexLock(&queue->mutex);
|
||||
|
||||
empty = queue->head == NULL;
|
||||
if (!empty) {
|
||||
|
@ -305,7 +305,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
|
||||
// if source queue is empty, we set destination qall to empty too.
|
||||
if (empty) {
|
||||
|
@ -355,7 +355,7 @@ void taosCloseQset(STaosQset *qset) {
|
|||
if (qset == NULL) return;
|
||||
|
||||
// remove all the queues from qset
|
||||
taosThreadMutexLock(&qset->mutex);
|
||||
(void)taosThreadMutexLock(&qset->mutex);
|
||||
while (qset->head) {
|
||||
STaosQueue *queue = qset->head;
|
||||
qset->head = qset->head->next;
|
||||
|
@ -363,7 +363,7 @@ void taosCloseQset(STaosQset *qset) {
|
|||
queue->qset = NULL;
|
||||
queue->next = NULL;
|
||||
}
|
||||
taosThreadMutexUnlock(&qset->mutex);
|
||||
(void)taosThreadMutexUnlock(&qset->mutex);
|
||||
|
||||
taosThreadMutexDestroy(&qset->mutex);
|
||||
tsem_destroy(&qset->sem);
|
||||
|
@ -382,19 +382,19 @@ void taosQsetThreadResume(STaosQset *qset) {
|
|||
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
|
||||
if (queue->qset) return -1;
|
||||
|
||||
taosThreadMutexLock(&qset->mutex);
|
||||
(void)taosThreadMutexLock(&qset->mutex);
|
||||
|
||||
queue->next = qset->head;
|
||||
queue->ahandle = ahandle;
|
||||
qset->head = queue;
|
||||
qset->numOfQueues++;
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
(void)taosThreadMutexLock(&queue->mutex);
|
||||
atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
|
||||
queue->qset = qset;
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
|
||||
taosThreadMutexUnlock(&qset->mutex);
|
||||
(void)taosThreadMutexUnlock(&qset->mutex);
|
||||
|
||||
uTrace("queue:%p is added into qset:%p", queue, qset);
|
||||
return 0;
|
||||
|
@ -403,7 +403,7 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
|
|||
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
|
||||
STaosQueue *tqueue = NULL;
|
||||
|
||||
taosThreadMutexLock(&qset->mutex);
|
||||
(void)taosThreadMutexLock(&qset->mutex);
|
||||
|
||||
if (qset->head) {
|
||||
if (qset->head == queue) {
|
||||
|
@ -427,15 +427,15 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
|
|||
if (qset->current == queue) qset->current = tqueue->next;
|
||||
qset->numOfQueues--;
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
(void)taosThreadMutexLock(&queue->mutex);
|
||||
atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
|
||||
queue->qset = NULL;
|
||||
queue->next = NULL;
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&qset->mutex);
|
||||
(void)taosThreadMutexUnlock(&qset->mutex);
|
||||
|
||||
uDebug("queue:%p is removed from qset:%p", queue, qset);
|
||||
}
|
||||
|
@ -446,7 +446,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo)
|
|||
|
||||
tsem_wait(&qset->sem);
|
||||
|
||||
taosThreadMutexLock(&qset->mutex);
|
||||
(void)taosThreadMutexLock(&qset->mutex);
|
||||
|
||||
for (int32_t i = 0; i < qset->numOfQueues; ++i) {
|
||||
if (qset->current == NULL) qset->current = qset->head;
|
||||
|
@ -455,7 +455,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo)
|
|||
if (queue == NULL) break;
|
||||
if (queue->head == NULL) continue;
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
(void)taosThreadMutexLock(&queue->mutex);
|
||||
|
||||
if (queue->head) {
|
||||
pNode = queue->head;
|
||||
|
@ -475,11 +475,11 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo)
|
|||
queue->memOfItems);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
if (pNode) break;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&qset->mutex);
|
||||
(void)taosThreadMutexUnlock(&qset->mutex);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -489,7 +489,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
|
|||
int32_t code = 0;
|
||||
|
||||
tsem_wait(&qset->sem);
|
||||
taosThreadMutexLock(&qset->mutex);
|
||||
(void)taosThreadMutexLock(&qset->mutex);
|
||||
|
||||
for (int32_t i = 0; i < qset->numOfQueues; ++i) {
|
||||
if (qset->current == NULL) qset->current = qset->head;
|
||||
|
@ -498,7 +498,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
|
|||
if (queue == NULL) break;
|
||||
if (queue->head == NULL) continue;
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
(void)taosThreadMutexLock(&queue->mutex);
|
||||
|
||||
if (queue->head) {
|
||||
qall->current = queue->head;
|
||||
|
@ -526,12 +526,12 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
(void)taosThreadMutexUnlock(&queue->mutex);
|
||||
|
||||
if (code != 0) break;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&qset->mutex);
|
||||
(void)taosThreadMutexUnlock(&qset->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -554,11 +554,11 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) {
|
|||
if (pItem == NULL) return;
|
||||
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
|
||||
|
||||
taosThreadMutexLock(&qset->mutex);
|
||||
(void)taosThreadMutexLock(&qset->mutex);
|
||||
for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
|
||||
tsem_post(&qset->sem);
|
||||
}
|
||||
taosThreadMutexUnlock(&qset->mutex);
|
||||
(void)taosThreadMutexUnlock(&qset->mutex);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -76,7 +76,7 @@ int32_t taosOpenRef(int32_t max, RefFp fp) {
|
|||
return terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&tsRefMutex);
|
||||
(void)taosThreadMutexLock(&tsRefMutex);
|
||||
|
||||
for (i = 0; i < TSDB_REF_OBJECTS; ++i) {
|
||||
tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS;
|
||||
|
@ -105,7 +105,7 @@ int32_t taosOpenRef(int32_t max, RefFp fp) {
|
|||
uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&tsRefMutex);
|
||||
(void)taosThreadMutexUnlock(&tsRefMutex);
|
||||
|
||||
return rsetId;
|
||||
}
|
||||
|
@ -121,7 +121,7 @@ int32_t taosCloseRef(int32_t rsetId) {
|
|||
|
||||
pSet = tsRefSetList + rsetId;
|
||||
|
||||
taosThreadMutexLock(&tsRefMutex);
|
||||
(void)taosThreadMutexLock(&tsRefMutex);
|
||||
|
||||
if (pSet->state == TSDB_REF_STATE_ACTIVE) {
|
||||
pSet->state = TSDB_REF_STATE_DELETED;
|
||||
|
@ -131,7 +131,7 @@ int32_t taosCloseRef(int32_t rsetId) {
|
|||
uTrace("rsetId:%d is already closed, count:%d", rsetId, pSet->count);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&tsRefMutex);
|
||||
(void)taosThreadMutexUnlock(&tsRefMutex);
|
||||
|
||||
if (deleted) taosDecRsetCount(pSet);
|
||||
|
||||
|
@ -349,7 +349,7 @@ int32_t taosListRef() {
|
|||
SRefNode *pNode;
|
||||
int32_t num = 0;
|
||||
|
||||
taosThreadMutexLock(&tsRefMutex);
|
||||
(void)taosThreadMutexLock(&tsRefMutex);
|
||||
|
||||
for (int32_t i = 0; i < TSDB_REF_OBJECTS; ++i) {
|
||||
pSet = tsRefSetList + i;
|
||||
|
@ -369,7 +369,7 @@ int32_t taosListRef() {
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&tsRefMutex);
|
||||
(void)taosThreadMutexUnlock(&tsRefMutex);
|
||||
|
||||
return num;
|
||||
}
|
||||
|
@ -475,7 +475,7 @@ static void taosDecRsetCount(SRefSet *pSet) {
|
|||
|
||||
if (count > 0) return;
|
||||
|
||||
taosThreadMutexLock(&tsRefMutex);
|
||||
(void)taosThreadMutexLock(&tsRefMutex);
|
||||
|
||||
if (pSet->state != TSDB_REF_STATE_EMPTY) {
|
||||
pSet->state = TSDB_REF_STATE_EMPTY;
|
||||
|
@ -489,5 +489,5 @@ static void taosDecRsetCount(SRefSet *pSet) {
|
|||
uTrace("rsetId:%d is cleaned, refSetNum:%d count:%d", pSet->rsetId, tsRefSetNum, pSet->count);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&tsRefMutex);
|
||||
(void)taosThreadMutexUnlock(&tsRefMutex);
|
||||
}
|
||||
|
|
|
@ -230,7 +230,7 @@ static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
|
|||
timer->prev = NULL;
|
||||
timer->expireAt = taosGetMonotonicMs() + delay;
|
||||
|
||||
taosThreadMutexLock(&wheel->mutex);
|
||||
(void)taosThreadMutexLock(&wheel->mutex);
|
||||
|
||||
uint32_t idx = 0;
|
||||
if (timer->expireAt > wheel->nextScanAt) {
|
||||
|
@ -248,7 +248,7 @@ static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
|
|||
p->prev = timer;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&wheel->mutex);
|
||||
(void)taosThreadMutexUnlock(&wheel->mutex);
|
||||
}
|
||||
|
||||
static bool removeFromWheel(tmr_obj_t* timer) {
|
||||
|
@ -259,7 +259,7 @@ static bool removeFromWheel(tmr_obj_t* timer) {
|
|||
time_wheel_t* wheel = wheels + wheelIdx;
|
||||
|
||||
bool removed = false;
|
||||
taosThreadMutexLock(&wheel->mutex);
|
||||
(void)taosThreadMutexLock(&wheel->mutex);
|
||||
// other thread may modify timer->wheel, check again.
|
||||
if (timer->wheel < tListLen(wheels)) {
|
||||
if (timer->prev != NULL) {
|
||||
|
@ -277,7 +277,7 @@ static bool removeFromWheel(tmr_obj_t* timer) {
|
|||
timerDecRef(timer);
|
||||
removed = true;
|
||||
}
|
||||
taosThreadMutexUnlock(&wheel->mutex);
|
||||
(void)taosThreadMutexUnlock(&wheel->mutex);
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
@ -372,7 +372,7 @@ static void taosTimerLoopFunc(int32_t signo) {
|
|||
|
||||
time_wheel_t* wheel = wheels + i;
|
||||
while (now >= wheel->nextScanAt) {
|
||||
taosThreadMutexLock(&wheel->mutex);
|
||||
(void)taosThreadMutexLock(&wheel->mutex);
|
||||
wheel->index = (wheel->index + 1) % wheel->size;
|
||||
tmr_obj_t* timer = wheel->slots[wheel->index];
|
||||
while (timer != NULL) {
|
||||
|
@ -407,7 +407,7 @@ static void taosTimerLoopFunc(int32_t signo) {
|
|||
timer = next;
|
||||
}
|
||||
wheel->nextScanAt += wheel->resolution;
|
||||
taosThreadMutexUnlock(&wheel->mutex);
|
||||
(void)taosThreadMutexUnlock(&wheel->mutex);
|
||||
}
|
||||
|
||||
addToExpired(expired);
|
||||
|
@ -594,13 +594,13 @@ void* taosTmrInit(int32_t maxNumOfTmrs, int32_t resolution, int32_t longest, con
|
|||
return NULL;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&tmrCtrlMutex);
|
||||
(void)taosThreadMutexLock(&tmrCtrlMutex);
|
||||
tmr_ctrl_t* ctrl = unusedTmrCtrl;
|
||||
if (ctrl != NULL) {
|
||||
unusedTmrCtrl = ctrl->next;
|
||||
numOfTmrCtrl++;
|
||||
}
|
||||
taosThreadMutexUnlock(&tmrCtrlMutex);
|
||||
(void)taosThreadMutexUnlock(&tmrCtrlMutex);
|
||||
|
||||
if (ctrl == NULL) {
|
||||
tmrError("%s too many timer controllers, failed to create timer controller.", label);
|
||||
|
@ -623,11 +623,11 @@ void taosTmrCleanUp(void* handle) {
|
|||
tmrDebug("%s timer controller is cleaned up.", ctrl->label);
|
||||
ctrl->label[0] = 0;
|
||||
|
||||
taosThreadMutexLock(&tmrCtrlMutex);
|
||||
(void)taosThreadMutexLock(&tmrCtrlMutex);
|
||||
ctrl->next = unusedTmrCtrl;
|
||||
numOfTmrCtrl--;
|
||||
unusedTmrCtrl = ctrl;
|
||||
taosThreadMutexUnlock(&tmrCtrlMutex);
|
||||
(void)taosThreadMutexUnlock(&tmrCtrlMutex);
|
||||
|
||||
tmrDebug("time controller's tmr ctrl size: %d", numOfTmrCtrl);
|
||||
if (numOfTmrCtrl <= 0) {
|
||||
|
|
|
@ -120,7 +120,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pool->mutex);
|
||||
(void)taosThreadMutexLock(&pool->mutex);
|
||||
taosSetQueueFp(queue, fp, NULL);
|
||||
taosAddIntoQset(pool->qset, queue, ahandle);
|
||||
|
||||
|
@ -146,7 +146,7 @@ STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
|
|||
} while (pool->num < pool->min);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pool->mutex);
|
||||
(void)taosThreadMutexUnlock(&pool->mutex);
|
||||
uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
|
||||
|
||||
return queue;
|
||||
|
@ -251,7 +251,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
|
|||
return NULL;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pool->mutex);
|
||||
(void)taosThreadMutexLock(&pool->mutex);
|
||||
taosSetQueueFp(queue, fp, NULL);
|
||||
taosAddIntoQset(pool->qset, queue, ahandle);
|
||||
|
||||
|
@ -267,7 +267,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
|
|||
uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
|
||||
taosMemoryFree(worker);
|
||||
taosCloseQueue(queue);
|
||||
taosThreadMutexUnlock(&pool->mutex);
|
||||
(void)taosThreadMutexUnlock(&pool->mutex);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
@ -294,7 +294,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
|
|||
curWorkerNum++;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pool->mutex);
|
||||
(void)taosThreadMutexUnlock(&pool->mutex);
|
||||
uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
|
||||
|
||||
return queue;
|
||||
|
@ -393,7 +393,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
|
|||
}
|
||||
|
||||
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
|
||||
taosThreadMutexLock(&pool->mutex);
|
||||
(void)taosThreadMutexLock(&pool->mutex);
|
||||
SWWorker *worker = pool->workers + pool->nextId;
|
||||
int32_t code = -1;
|
||||
STaosQueue *queue;
|
||||
|
@ -427,7 +427,7 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
|
|||
}
|
||||
|
||||
_OVER:
|
||||
taosThreadMutexUnlock(&pool->mutex);
|
||||
(void)taosThreadMutexUnlock(&pool->mutex);
|
||||
|
||||
if (code) {
|
||||
if (queue != NULL) taosCloseQueue(queue);
|
||||
|
@ -675,9 +675,9 @@ static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) {
|
|||
while (waiting > 0) {
|
||||
int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1);
|
||||
if (waitingNew == waiting) {
|
||||
taosThreadMutexLock(&pPool->waitingAfterBlockLock);
|
||||
(void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
|
||||
taosThreadCondSignal(&pPool->waitingAfterBlockCond);
|
||||
taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
|
||||
ret = true;
|
||||
break;
|
||||
}
|
||||
|
@ -693,9 +693,9 @@ static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void *p) {
|
|||
while (waiting > 0) {
|
||||
int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1);
|
||||
if (waitingNew == waiting) {
|
||||
taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
|
||||
(void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
|
||||
taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond);
|
||||
taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
|
||||
ret = true;
|
||||
break;
|
||||
}
|
||||
|
@ -731,18 +731,18 @@ static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) {
|
|||
}
|
||||
}
|
||||
// to wait for process
|
||||
taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
|
||||
(void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
|
||||
atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1);
|
||||
if (!pPool->exit) taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock);
|
||||
// recovered from waiting
|
||||
taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) {
|
||||
if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(pPool) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(pPool) ||
|
||||
tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) {
|
||||
taosThreadMutexLock(&pPool->poolLock);
|
||||
(void)taosThreadMutexLock(&pPool->poolLock);
|
||||
SListNode *pNode = listNode(pWorker);
|
||||
tdListPopNode(pPool->workers, pNode);
|
||||
// reclaim some workers
|
||||
|
@ -757,29 +757,29 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQ
|
|||
taosMemoryFree(head);
|
||||
}
|
||||
tdListAppendNode(pPool->exitedWorkers, pNode);
|
||||
taosThreadMutexUnlock(&pPool->poolLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->poolLock);
|
||||
return false;
|
||||
}
|
||||
|
||||
// put back to backup pool
|
||||
tdListAppendNode(pPool->backupWorkers, pNode);
|
||||
taosThreadMutexUnlock(&pPool->poolLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->poolLock);
|
||||
|
||||
// start to wait at backup cond
|
||||
taosThreadMutexLock(&pPool->backupLock);
|
||||
(void)taosThreadMutexLock(&pPool->backupLock);
|
||||
atomic_fetch_add_32(&pPool->backupNum, 1);
|
||||
if (!pPool->exit) taosThreadCondWait(&pPool->backupCond, &pPool->backupLock);
|
||||
taosThreadMutexUnlock(&pPool->backupLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->backupLock);
|
||||
|
||||
// recovered from backup
|
||||
taosThreadMutexLock(&pPool->poolLock);
|
||||
(void)taosThreadMutexLock(&pPool->poolLock);
|
||||
if (pPool->exit) {
|
||||
taosThreadMutexUnlock(&pPool->poolLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->poolLock);
|
||||
return false;
|
||||
}
|
||||
tdListPopNode(pPool->backupWorkers, pNode);
|
||||
tdListAppendNode(pPool->workers, pNode);
|
||||
taosThreadMutexUnlock(&pPool->poolLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->poolLock);
|
||||
|
||||
return true;
|
||||
} else {
|
||||
|
@ -819,7 +819,7 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
|
|||
}
|
||||
|
||||
void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
|
||||
taosThreadMutexLock(&pPool->poolLock);
|
||||
(void)taosThreadMutexLock(&pPool->poolLock);
|
||||
pPool->exit = true;
|
||||
int32_t size = listNEles(pPool->workers);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
|
@ -829,31 +829,31 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
|
|||
for (int32_t i = 0; i < size; ++i) {
|
||||
taosQsetThreadResume(pPool->qset);
|
||||
}
|
||||
taosThreadMutexUnlock(&pPool->poolLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->poolLock);
|
||||
|
||||
taosThreadMutexLock(&pPool->backupLock);
|
||||
(void)taosThreadMutexLock(&pPool->backupLock);
|
||||
taosThreadCondBroadcast(&pPool->backupCond);
|
||||
taosThreadMutexUnlock(&pPool->backupLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->backupLock);
|
||||
|
||||
taosThreadMutexLock(&pPool->waitingAfterBlockLock);
|
||||
(void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
|
||||
taosThreadCondBroadcast(&pPool->waitingAfterBlockCond);
|
||||
taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
|
||||
|
||||
taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
|
||||
(void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
|
||||
taosThreadCondBroadcast(&pPool->waitingBeforeProcessMsgCond);
|
||||
taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
|
||||
|
||||
int32_t idx = 0;
|
||||
SQueryAutoQWorker *worker = NULL;
|
||||
while (true) {
|
||||
taosThreadMutexLock(&pPool->poolLock);
|
||||
(void)taosThreadMutexLock(&pPool->poolLock);
|
||||
if (listNEles(pPool->workers) == 0) {
|
||||
taosThreadMutexUnlock(&pPool->poolLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->poolLock);
|
||||
break;
|
||||
}
|
||||
SListNode *pNode = tdListPopHead(pPool->workers);
|
||||
worker = (SQueryAutoQWorker *)pNode->data;
|
||||
taosThreadMutexUnlock(&pPool->poolLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->poolLock);
|
||||
if (worker && taosCheckPthreadValid(worker->thread)) {
|
||||
taosThreadJoin(worker->thread, NULL);
|
||||
taosThreadClear(&worker->thread);
|
||||
|
@ -905,7 +905,7 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand
|
|||
return NULL;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pool->poolLock);
|
||||
(void)taosThreadMutexLock(&pool->poolLock);
|
||||
taosSetQueueFp(queue, fp, NULL);
|
||||
taosAddIntoQset(pool->qset, queue, ahandle);
|
||||
SQueryAutoQWorker worker = {0};
|
||||
|
@ -944,7 +944,7 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand
|
|||
} while (pool->num < pool->min);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pool->poolLock);
|
||||
(void)taosThreadMutexUnlock(&pool->poolLock);
|
||||
uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
|
||||
|
||||
return queue;
|
||||
|
@ -968,15 +968,15 @@ static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool) {
|
|||
SQueryAutoQWorker worker = {0};
|
||||
worker.pool = pool;
|
||||
worker.backupIdx = -1;
|
||||
taosThreadMutexLock(&pool->poolLock);
|
||||
(void)taosThreadMutexLock(&pool->poolLock);
|
||||
worker.id = listNEles(pool->workers);
|
||||
SListNode *pNode = tdListAdd(pool->workers, &worker);
|
||||
if (!pNode) {
|
||||
taosThreadMutexUnlock(&pool->poolLock);
|
||||
(void)taosThreadMutexUnlock(&pool->poolLock);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
taosThreadMutexUnlock(&pool->poolLock);
|
||||
(void)taosThreadMutexUnlock(&pool->poolLock);
|
||||
pWorker = (SQueryAutoQWorker *)pNode->data;
|
||||
|
||||
TdThreadAttr thAttr;
|
||||
|
@ -1015,10 +1015,10 @@ static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
taosThreadMutexLock(&pPool->waitingAfterBlockLock);
|
||||
(void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
|
||||
atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1);
|
||||
if (!pPool->exit) taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock);
|
||||
taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
|
||||
(void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
|
||||
if (pPool->exit) return TSDB_CODE_QRY_QWORKER_QUIT;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue