diff --git a/include/util/tsched.h b/include/util/tsched.h index 347cacd191..379456afe6 100644 --- a/include/util/tsched.h +++ b/include/util/tsched.h @@ -31,7 +31,6 @@ typedef struct SSchedMsg { void *thandle; } SSchedMsg; - typedef struct { char label[TSDB_LABEL_LEN]; tsem_t emptySem; @@ -48,7 +47,6 @@ typedef struct { void *pTimer; } SSchedQueue; - /** * Create a thread-safe ring-buffer based task queue and return the instance. A thread * pool will be created to consume the messages in the queue. @@ -57,7 +55,7 @@ typedef struct { * @param label the label of the queue * @return the created queue scheduler */ -void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label, SSchedQueue* pSched); +void *taosInitScheduler(int32_t capacity, int32_t numOfThreads, const char *label, SSchedQueue *pSched); /** * Create a thread-safe ring-buffer based task queue and return the instance. @@ -83,7 +81,7 @@ void taosCleanUpScheduler(void *queueScheduler); * @param queueScheduler the queue scheduler instance * @param pMsg the message for the task */ -void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg); +int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg); #ifdef __cplusplus } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 5ebc2729f8..39b4b069a0 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1399,7 +1399,12 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { arg->msg = *pMsg; arg->pEpset = tEpSet; - taosAsyncExec(doProcessMsgFromServer, arg, NULL); + if (0 != taosAsyncExec(doProcessMsgFromServer, arg, NULL)) { + tscError("failed to sched msg to tsc, tsc ready to quit"); + rpcFreeCont(pMsg->pCont); + taosMemoryFree(arg->pEpset); + taosMemoryFree(arg); + } } TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) { diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index d848016e46..8162b922ce 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -134,8 +134,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) schedMsg.thandle = execParam; schedMsg.msg = code; - taosScheduleTask(&pTaskQueue, &schedMsg); - return 0; + return taosScheduleTask(&pTaskQueue, &schedMsg); } void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { @@ -472,5 +471,3 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { return TSDB_CODE_SUCCESS; } - - diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index 89471c4347..9cf9e2c431 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -149,18 +149,18 @@ void *taosProcessSchedQueue(void *scheduler) { return NULL; } -void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { +int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { SSchedQueue *pSched = (SSchedQueue *)queueScheduler; int32_t ret = 0; if (pSched == NULL) { uError("sched is not ready, msg:%p is dropped", pMsg); - return; + return -1; } if (atomic_load_8(&pSched->stop)) { uError("sched is already stopped, msg:%p is dropped", pMsg); - return; + return -1; } if ((ret = tsem_wait(&pSched->emptySem)) != 0) { @@ -185,6 +185,7 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno)); ASSERT(0); } + return ret; } void taosCleanUpScheduler(void *param) { @@ -192,11 +193,11 @@ void taosCleanUpScheduler(void *param) { if (pSched == NULL) return; uDebug("start to cleanup %s schedQsueue", pSched->label); - + atomic_store_8(&pSched->stop, 1); taosMsleep(200); - + for (int32_t i = 0; i < pSched->numOfThreads; ++i) { if (taosCheckPthreadValid(pSched->qthread[i])) { tsem_post(&pSched->fullSem); @@ -220,7 +221,7 @@ void taosCleanUpScheduler(void *param) { if (pSched->queue) taosMemoryFree(pSched->queue); if (pSched->qthread) taosMemoryFree(pSched->qthread); - //taosMemoryFree(pSched); + // taosMemoryFree(pSched); } // for debug purpose, dump the scheduler status every 1min.