remove test code

This commit is contained in:
wangjiaming0909 2024-06-26 16:46:37 +08:00
parent 02a7d29841
commit a25571313f
3 changed files with 13 additions and 20 deletions

View File

@ -130,13 +130,11 @@ typedef struct SQueryAutoQWorkerPool {
int32_t num; int32_t num;
int32_t max; int32_t max;
int32_t min; int32_t min;
int32_t maxRunning; int32_t maxInUse;
int32_t activeN; // running workers and workers waiting at reading new queue msg int32_t activeN; // running workers and workers waiting at reading new queue msg
int32_t runningN; // workers processing queue msgs, not include blocking/waitingA/waitingB workers. int32_t runningN; // workers processing queue msgs, not include blocking/waitingA/waitingB workers.
int32_t blockingN; // blocked worker num, like exchangeoperator sem_wait
int32_t waitingAfterBlockN; // workers that recovered from blocking but waiting for too many running workers int32_t waitingAfterBlockN; // workers that recovered from blocking but waiting for too many running workers
TdThreadMutex waitingAfterBlockLock; TdThreadMutex waitingAfterBlockLock;
TdThreadCond waitingAfterBlockCond; TdThreadCond waitingAfterBlockCond;
@ -165,9 +163,9 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pPool, void *ahan
void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool* pPool, STaosQueue* pQ); void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool* pPool, STaosQueue* pQ);
typedef struct SQueryAutoQWorkerPoolCB { typedef struct SQueryAutoQWorkerPoolCB {
SQueryAutoQWorkerPool* pPool; void *pPool;
int32_t (*beforeBlocking)(void* pPool); int32_t (*beforeBlocking)(void *pPool);
int32_t (*afterRecoverFromBlocking)(void* pPool); int32_t (*afterRecoverFromBlocking)(void *pPool);
} SQueryAutoQWorkerPoolCB; } SQueryAutoQWorkerPoolCB;
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -254,7 +254,6 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
req->queryId = qId; req->queryId = qId;
req->taskId = tId; req->taskId = tId;
req->execId = eId; req->execId = eId;
//taosMsleep(500);
SRpcMsg pNewMsg = { SRpcMsg pNewMsg = {
.msgType = TDMT_SCH_QUERY_CONTINUE, .msgType = TDMT_SCH_QUERY_CONTINUE,

View File

@ -482,12 +482,10 @@ void tMultiWorkerCleanup(SMultiWorker *pWorker) {
} }
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool); static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool);
static void tQueryAutoQWorkerStartRunning(void *p);
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p); static int32_t tQueryAutoQWorkerBeforeBlocking(void *p);
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p); static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p);
static int32_t tQueryAutoQWorkerWaitingCheck(void *p); static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool);
static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker); static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker);
static void tQueryAutoQWorkerFinish(void *p);
static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
SQueryAutoQWorkerPool *pool = worker->pool; SQueryAutoQWorkerPool *pool = worker->pool;
@ -498,7 +496,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
taosBlockSIGPIPE(); taosBlockSIGPIPE();
setThreadName(pool->name); setThreadName(pool->name);
worker->pid = taosGetSelfPthreadId(); worker->pid = taosGetSelfPthreadId();
uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
while (1) { while (1) {
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
@ -514,8 +512,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
} }
} }
code = tQueryAutoQWorkerWaitingCheck(pool); tQueryAutoQWorkerWaitingCheck(pool);
// TODO wjm what if taosd is exiting
if (qinfo.fp != NULL) { if (qinfo.fp != NULL) {
qinfo.workerId = worker->id; qinfo.workerId = worker->id;
@ -526,6 +523,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
taosUpdateItemSize(qinfo.queue, 1); taosUpdateItemSize(qinfo.queue, 1);
if (!tQueryAutoQWorkerTryRecycleWorker(pool, worker)) { if (!tQueryAutoQWorkerTryRecycleWorker(pool, worker)) {
uDebug("worker:%s:%d exited", pool->name, worker->id);
break; break;
} }
} }
@ -588,8 +586,7 @@ static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) {
return ret; return ret;
} }
static int32_t tQueryAutoQWorkerWaitingCheck(void *p) { static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) {
SQueryAutoQWorkerPool* pPool = p;
int32_t running = pPool->runningN; int32_t running = pPool->runningN;
while (running < pPool->num) { while (running < pPool->num) {
int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1); int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1);
@ -606,7 +603,6 @@ static int32_t tQueryAutoQWorkerWaitingCheck(void *p) {
if (!pPool->exit) taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock); if (!pPool->exit) taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock);
// recovered from waiting // recovered from waiting
taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
if (pPool->exit) return -1; // TODO wjm error code
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -617,8 +613,8 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQ
SListNode* pNode = listNode(pWorker); SListNode* pNode = listNode(pWorker);
tdListPopNode(pPool->workers, pNode); tdListPopNode(pPool->workers, pNode);
// reclaim some workers // reclaim some workers
if (pWorker->id >= pPool->num * 2 ) { if (pWorker->id >= pPool->maxInUse) {
while (listNEles(pPool->exitedWorkers) > pPool->num) { while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) {
SListNode* head = tdListPopHead(pPool->exitedWorkers); SListNode* head = tdListPopHead(pPool->exitedWorkers);
SQueryAutoQWorker* pWorker = (SQueryAutoQWorker*)head->data; SQueryAutoQWorker* pWorker = (SQueryAutoQWorker*)head->data;
if (pWorker && taosCheckPthreadValid(pWorker->thread)) { if (pWorker && taosCheckPthreadValid(pWorker->thread)) {
@ -668,6 +664,7 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
if (!pool->backupWorkers) return TSDB_CODE_OUT_OF_MEMORY; if (!pool->backupWorkers) return TSDB_CODE_OUT_OF_MEMORY;
pool->exitedWorkers = tdListNew(sizeof(SQueryAutoQWorker)); pool->exitedWorkers = tdListNew(sizeof(SQueryAutoQWorker));
if (!pool->exitedWorkers) return TSDB_CODE_OUT_OF_MEMORY; if (!pool->exitedWorkers) return TSDB_CODE_OUT_OF_MEMORY;
pool->maxInUse = pool->max * 2 + 2;
(void)taosThreadMutexInit(&pool->poolLock, NULL); (void)taosThreadMutexInit(&pool->poolLock, NULL);
(void)taosThreadMutexInit(&pool->backupLock, NULL); (void)taosThreadMutexInit(&pool->backupLock, NULL);
@ -732,7 +729,6 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
} }
while (listNEles(pPool->backupWorkers) > 0) { while (listNEles(pPool->backupWorkers) > 0) {
// TODO wjm need lock?
SListNode* pNode = tdListPopHead(pPool->backupWorkers); SListNode* pNode = tdListPopHead(pPool->backupWorkers);
worker = (SQueryAutoQWorker*)pNode->data; worker = (SQueryAutoQWorker*)pNode->data;
if (worker && taosCheckPthreadValid(worker->thread)) { if (worker && taosCheckPthreadValid(worker->thread)) {
@ -891,6 +887,6 @@ static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) {
atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1); atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1);
if (!pPool->exit) taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock); if (!pPool->exit) taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock);
taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
if (pPool->exit) return -1; // TODO wjm error code if (pPool->exit) return TSDB_CODE_QRY_QWORKER_QUIT;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }