more change

This commit is contained in:
Hongze Cheng 2024-09-24 16:48:07 +08:00
parent 85e3b26a4d
commit 4b75755af1
10 changed files with 133 additions and 53 deletions

View File

@ -292,7 +292,7 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
// tsdbRead.c ============================================================================================== // tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap, const char* id); int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap, const char *id);
void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
int32_t tsdbGetTableSchema(SMeta *pMeta, int64_t uid, STSchema **pSchema, int64_t *suid); int32_t tsdbGetTableSchema(SMeta *pMeta, int64_t uid, STSchema **pSchema, int64_t *suid);
@ -1069,6 +1069,13 @@ int32_t tsdbSnapPrepDescription(SVnode *pVnode, SSnapshot *pSnap);
void tsdbRemoveFile(const char *path); void tsdbRemoveFile(const char *path);
#define taosCloseFileWithLog(fd) \
do { \
if (taosCloseFile(fd) < 0) { \
tsdbError("failed to close file, fd:%d, %s", fd, tstrerror(terrno)); \
} \
} while (0)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -130,7 +130,7 @@ _exit:
tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
} }
taosMemoryFree(pData); taosMemoryFree(pData);
(void)taosCloseFile(&pFD); taosCloseFileWithLog(&pFD);
return code; return code;
} }
@ -300,26 +300,26 @@ static int32_t load_fs(const char *fname, STsdbFS *pFS) {
int64_t size; int64_t size;
code = taosFStatFile(pFD, &size, NULL); code = taosFStatFile(pFD, &size, NULL);
if (code != 0) { if (code != 0) {
(void)taosCloseFile(&pFD); taosCloseFileWithLog(&pFD);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
pData = taosMemoryMalloc(size); pData = taosMemoryMalloc(size);
if (pData == NULL) { if (pData == NULL) {
code = terrno; code = terrno;
(void)taosCloseFile(&pFD); taosCloseFileWithLog(&pFD);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (taosReadFile(pFD, pData, size) < 0) { if (taosReadFile(pFD, pData, size) < 0) {
code = terrno; code = terrno;
(void)taosCloseFile(&pFD); taosCloseFileWithLog(&pFD);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (!taosCheckChecksumWhole(pData, size)) { if (!taosCheckChecksumWhole(pData, size)) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
(void)taosCloseFile(&pFD); taosCloseFileWithLog(&pFD);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -331,7 +331,7 @@ _exit:
tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
} }
taosMemoryFree(pData); taosMemoryFree(pData);
(void)taosCloseFile(&pFD); taosCloseFileWithLog(&pFD);
return code; return code;
} }

View File

@ -44,7 +44,12 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
} }
fs[0]->tsdb = pTsdb; fs[0]->tsdb = pTsdb;
(void)tsem_init(&fs[0]->canEdit, 0, 1); int32_t code = tsem_init(&fs[0]->canEdit, 0, 1);
if (code) {
taosMemoryFree(fs[0]);
return code;
}
fs[0]->fsstate = TSDB_FS_STATE_NORMAL; fs[0]->fsstate = TSDB_FS_STATE_NORMAL;
fs[0]->neid = 0; fs[0]->neid = 0;
TARRAY2_INIT(fs[0]->fSetArr); TARRAY2_INIT(fs[0]->fSetArr);
@ -100,7 +105,7 @@ _exit:
tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code)); tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
} }
taosMemoryFree(data); taosMemoryFree(data);
(void)taosCloseFile(&fp); taosCloseFileWithLog(&fp);
return code; return code;
} }
@ -140,7 +145,7 @@ _exit:
tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code)); tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
json[0] = NULL; json[0] = NULL;
} }
(void)taosCloseFile(&fp); taosCloseFileWithLog(&fp);
taosMemoryFree(data); taosMemoryFree(data);
return code; return code;
} }

View File

@ -99,8 +99,12 @@ _exit:
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino, tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code)); tstrerror(code));
} }
(void)taosCloseFile(&fdFrom); if (taosCloseFile(&fdFrom) != 0) {
(void)taosCloseFile(&fdTo); tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_from);
}
if (taosCloseFile(&fdTo) != 0) {
tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_to);
}
return code; return code;
} }
@ -136,8 +140,12 @@ _exit:
tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino, tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code)); tstrerror(code));
} }
(void)taosCloseFile(&fdFrom); if (taosCloseFile(&fdFrom) != 0) {
(void)taosCloseFile(&fdTo); tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
if (taosCloseFile(&fdTo) != 0) {
tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
return code; return code;
} }
@ -441,7 +449,9 @@ _exit:
tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino, tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code)); tstrerror(code));
} }
(void)taosCloseFile(&fdFrom); if (taosCloseFile(&fdFrom) != 0) {
tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
return code; return code;
} }
@ -541,8 +551,13 @@ _exit:
tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino, tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code)); tstrerror(code));
} }
(void)taosCloseFile(&fdFrom); if (taosCloseFile(&fdFrom) != 0) {
(void)taosCloseFile(&fdTo); tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
if (taosCloseFile(&fdTo) != 0) {
tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
return code; return code;
} }
@ -639,8 +654,12 @@ _exit:
tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino, tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
tstrerror(code)); tstrerror(code));
} }
(void)taosCloseFile(&fdFrom); if (taosCloseFile(&fdFrom) != 0) {
(void)taosCloseFile(&fdTo); tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
if (taosCloseFile(&fdTo) != 0) {
tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
}
return code; return code;
} }
@ -699,7 +718,9 @@ static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
if (taosCheckExistFile(fname1)) { if (taosCheckExistFile(fname1)) {
int32_t mtime = 0; int32_t mtime = 0;
int64_t size = 0; int64_t size = 0;
(void)taosStatFile(fname1, &size, &mtime, NULL); if (taosStatFile(fname1, &size, &mtime, NULL) != 0) {
tsdbError("vgId:%d, %s failed at %s:%d ", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, __LINE__);
}
if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) { if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
TAOS_CHECK_GOTO(tsdbMigrateDataFileLCS3(rtner, fobj, size, chunksize), &lino, _exit); TAOS_CHECK_GOTO(tsdbMigrateDataFileLCS3(rtner, fobj, size, chunksize), &lino, _exit);
} }

View File

@ -187,10 +187,12 @@ static void vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
task->prev->next = task->next; task->prev->next = task->next;
task->next->prev = task->prev; task->next->prev = task->prev;
if (task->cancel) { if (task->cancel) {
TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){ if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
.cancel = task->cancel, .cancel = task->cancel,
.arg = task->arg, .arg = task->arg,
})); }) == NULL) {
vError("failed to push cancel task into array");
};
} }
vnodeAsyncTaskDone(async, task); vnodeAsyncTaskDone(async, task);
} }
@ -748,10 +750,12 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
task->prev->next = task->next; task->prev->next = task->next;
task->next->prev = task->prev; task->next->prev = task->prev;
if (task->cancel) { if (task->cancel) {
TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){ if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
.cancel = task->cancel, .cancel = task->cancel,
.arg = task->arg, .arg = task->arg,
})); }) == NULL) {
vError("failed to push cancel info");
};
} }
vnodeAsyncTaskDone(async, task); vnodeAsyncTaskDone(async, task);
} }
@ -763,10 +767,12 @@ int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
channel->scheduled->prev->next = channel->scheduled->next; channel->scheduled->prev->next = channel->scheduled->next;
channel->scheduled->next->prev = channel->scheduled->prev; channel->scheduled->next->prev = channel->scheduled->prev;
if (channel->scheduled->cancel) { if (channel->scheduled->cancel) {
TAOS_UNUSED(taosArrayPush(cancelArray, &(SVATaskCancelInfo){ if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
.cancel = channel->scheduled->cancel, .cancel = channel->scheduled->cancel,
.arg = channel->scheduled->arg, .arg = channel->scheduled->arg,
})); }) == NULL) {
vError("failed to push cancel info");
}
} }
vnodeAsyncTaskDone(async, channel->scheduled); vnodeAsyncTaskDone(async, channel->scheduled);
} }

View File

@ -201,7 +201,9 @@ _exit:
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname, vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", pInfo->config.vgId, fname,
pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion); pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, pInfo->config.syncCfg.changeVersion);
} }
(void)taosCloseFile(&pFile); if (taosCloseFile(&pFile) != 0) {
vError("vgId:%d, failed to close file", pInfo->config.vgId);
}
taosMemoryFree(data); taosMemoryFree(data);
return code; return code;
} }
@ -263,7 +265,9 @@ _exit:
} }
} }
taosMemoryFree(pData); taosMemoryFree(pData);
(void)taosCloseFile(&pFile); if (taosCloseFile(&pFile) != 0) {
vError("vgId:%d, failed to close file", pInfo->config.vgId);
}
return code; return code;
} }
@ -496,7 +500,9 @@ void vnodeRollback(SVnode *pVnode) {
offset = strlen(tFName); offset = strlen(tFName);
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP); snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
TAOS_UNUSED(taosRemoveFile(tFName)); if (taosRemoveFile(tFName) != 0) {
vError("vgId:%d, failed to remove file %s since %s", TD_VID(pVnode), tFName, tstrerror(terrno));
}
} }
static int vnodeEncodeState(const void *pObj, SJson *pJson) { static int vnodeEncodeState(const void *pObj, SJson *pJson) {

View File

@ -274,13 +274,17 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
int64_t size; int64_t size;
code = taosFStatFile(pFile, &size, NULL); code = taosFStatFile(pFile, &size, NULL);
if (code != 0) { if (code != 0) {
(void)taosCloseFile(&pFile); if (taosCloseFile(&pFile) != 0) {
vError("vgId:%d, failed to close file", vgId);
}
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1); *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1);
if (*ppData == NULL) { if (*ppData == NULL) {
(void)taosCloseFile(&pFile); if (taosCloseFile(&pFile) != 0) {
vError("vgId:%d, failed to close file", vgId);
}
TSDB_CHECK_CODE(code = terrno, lino, _exit); TSDB_CHECK_CODE(code = terrno, lino, _exit);
} }
((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG; ((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG;
@ -289,11 +293,15 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) { if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) {
taosMemoryFree(*ppData); taosMemoryFree(*ppData);
(void)taosCloseFile(&pFile); if (taosCloseFile(&pFile) != 0) {
vError("vgId:%d, failed to close file", vgId);
}
TSDB_CHECK_CODE(code = terrno, lino, _exit); TSDB_CHECK_CODE(code = terrno, lino, _exit);
} }
(void)taosCloseFile(&pFile); if (taosCloseFile(&pFile) != 0) {
vError("vgId:%d, failed to close file", vgId);
}
pReader->cfgDone = 1; pReader->cfgDone = 1;
goto _exit; goto _exit;

View File

@ -28,7 +28,9 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg, vGTrace("vgId:%d, msg:%p wait block, type:%s sec:%d seq:%" PRId64, pVnode->config.vgId, pMsg,
TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq); TMSG_INFO(pMsg->msgType), pVnode->blockSec, pVnode->blockSeq);
(void)tsem_wait(&pVnode->syncSem); if (tsem_wait(&pVnode->syncSem) != 0) {
vError("vgId:%d, failed to wait sem", pVnode->config.vgId);
}
} }
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
@ -41,7 +43,9 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
pVnode->blocked = false; pVnode->blocked = false;
pVnode->blockSec = 0; pVnode->blockSec = 0;
pVnode->blockSeq = 0; pVnode->blockSeq = 0;
(void)tsem_post(&pVnode->syncSem); if (tsem_post(&pVnode->syncSem) != 0) {
vError("vgId:%d, failed to post sem", pVnode->config.vgId);
}
} }
(void)taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
} }
@ -613,7 +617,9 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
if (pVnode->blocked) { if (pVnode->blocked) {
pVnode->blocked = false; pVnode->blocked = false;
vDebug("vgId:%d, become follower and post block", pVnode->config.vgId); vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
(void)tsem_post(&pVnode->syncSem); if (tsem_post(&pVnode->syncSem) != 0) {
vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
}
} }
(void)taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
@ -633,7 +639,9 @@ static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
if (pVnode->blocked) { if (pVnode->blocked) {
pVnode->blocked = false; pVnode->blocked = false;
vDebug("vgId:%d, become learner and post block", pVnode->config.vgId); vDebug("vgId:%d, become learner and post block", pVnode->config.vgId);
(void)tsem_post(&pVnode->syncSem); if (tsem_post(&pVnode->syncSem) != 0) {
vError("vgId:%d, failed to post sync semaphore", pVnode->config.vgId);
}
} }
(void)taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
} }
@ -766,7 +774,9 @@ void vnodeSyncPreClose(SVnode *pVnode) {
if (pVnode->blocked) { if (pVnode->blocked) {
vInfo("vgId:%d, post block after close sync", pVnode->config.vgId); vInfo("vgId:%d, post block after close sync", pVnode->config.vgId);
pVnode->blocked = false; pVnode->blocked = false;
(void)tsem_post(&pVnode->syncSem); if (tsem_post(&pVnode->syncSem) != 0) {
vError("vgId:%d, failed to post block", pVnode->config.vgId);
}
} }
(void)taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);
} }
@ -801,7 +811,9 @@ void vnodeSyncCheckTimeout(SVnode *pVnode) {
pVnode->blocked = false; pVnode->blocked = false;
pVnode->blockSec = 0; pVnode->blockSec = 0;
pVnode->blockSeq = 0; pVnode->blockSeq = 0;
(void)tsem_post(&pVnode->syncSem); if (tsem_post(&pVnode->syncSem) != 0) {
vError("vgId:%d, failed to post block", pVnode->config.vgId);
}
} }
} }
(void)taosThreadMutexUnlock(&pVnode->lock); (void)taosThreadMutexUnlock(&pVnode->lock);

View File

@ -232,7 +232,7 @@ int32_t tsem_init(tsem_t *psem, int flags, unsigned int count) {
if(sem_init(psem, flags, count) == 0) { if(sem_init(psem, flags, count) == 0) {
return 0; return 0;
} else { } else {
return TAOS_SYSTEM_ERROR(errno); return terrno = TAOS_SYSTEM_ERROR(errno);
} }
} }

View File

@ -224,7 +224,9 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
(void)taosThreadMutexUnlock(&queue->mutex); (void)taosThreadMutexUnlock(&queue->mutex);
if (queue->qset) { if (queue->qset) {
(void)tsem_post(&queue->qset->sem); if (tsem_post(&queue->qset->sem) != 0) {
uError("failed to post semaphore for queue set:%p", queue->qset);
}
} }
return code; return code;
} }
@ -333,7 +335,10 @@ int32_t taosOpenQset(STaosQset **qset) {
} }
(void)taosThreadMutexInit(&(*qset)->mutex, NULL); (void)taosThreadMutexInit(&(*qset)->mutex, NULL);
(void)tsem_init(&(*qset)->sem, 0, 0); if (tsem_init(&(*qset)->sem, 0, 0) != 0) {
taosMemoryFree(*qset);
return terrno;
}
uDebug("qset:%p is opened", qset); uDebug("qset:%p is opened", qset);
return 0; return 0;
@ -354,7 +359,9 @@ void taosCloseQset(STaosQset *qset) {
(void)taosThreadMutexUnlock(&qset->mutex); (void)taosThreadMutexUnlock(&qset->mutex);
(void)taosThreadMutexDestroy(&qset->mutex); (void)taosThreadMutexDestroy(&qset->mutex);
(void)tsem_destroy(&qset->sem); if (tsem_destroy(&qset->sem) != 0) {
uError("failed to destroy semaphore for qset:%p", qset);
}
taosMemoryFree(qset); taosMemoryFree(qset);
uDebug("qset:%p is closed", qset); uDebug("qset:%p is closed", qset);
} }
@ -364,7 +371,9 @@ void taosCloseQset(STaosQset *qset) {
// thread to exit. // thread to exit.
void taosQsetThreadResume(STaosQset *qset) { void taosQsetThreadResume(STaosQset *qset) {
uDebug("qset:%p, it will exit", qset); uDebug("qset:%p, it will exit", qset);
(void)tsem_post(&qset->sem); if (tsem_post(&qset->sem) != 0) {
uError("failed to post semaphore for qset:%p", qset);
}
} }
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) { int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
@ -432,7 +441,9 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo)
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int32_t code = 0; int32_t code = 0;
(void)tsem_wait(&qset->sem); if (tsem_wait(&qset->sem) != 0) {
uError("failed to wait semaphore for qset:%p", qset);
}
(void)taosThreadMutexLock(&qset->mutex); (void)taosThreadMutexLock(&qset->mutex);
@ -476,7 +487,9 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
STaosQueue *queue; STaosQueue *queue;
int32_t code = 0; int32_t code = 0;
(void)tsem_wait(&qset->sem); if (tsem_wait(&qset->sem) != 0) {
uError("failed to wait semaphore for qset:%p", qset);
}
(void)taosThreadMutexLock(&qset->mutex); (void)taosThreadMutexLock(&qset->mutex);
for (int32_t i = 0; i < qset->numOfQueues; ++i) { for (int32_t i = 0; i < qset->numOfQueues; ++i) {
@ -510,7 +523,9 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
(void)atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); (void)atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
for (int32_t j = 1; j < qall->numOfItems; ++j) { for (int32_t j = 1; j < qall->numOfItems; ++j) {
(void)tsem_wait(&qset->sem); if (tsem_wait(&qset->sem) != 0) {
uError("failed to wait semaphore for qset:%p", qset);
}
} }
} }