Merge branch 'feature/scheduler' of github.com:taosdata/TDengine into feature/scheduler

This commit is contained in:
dapan1121 2022-03-24 16:39:47 +08:00
commit 64d4fb0bfd
2 changed files with 21 additions and 9 deletions

View File

@ -274,12 +274,15 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
if (cVal == NULL) { if (cVal == NULL) {
return NULL; return NULL;
} }
void *ret = NULL; void* ret = NULL;
(*cVal->clone)(cVal->val, &ret); (*cVal->clone)(cVal->val, &ret);
return ret; return ret;
} }
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) { void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
void *ret = NULL; void* ret = NULL;
if (ctx->brokenVal.clone == NULL) {
return ret;
}
(*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret); (*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret);
*msgType = ctx->brokenVal.msgType; *msgType = ctx->brokenVal.msgType;
@ -292,6 +295,9 @@ void transQueueInit(STransQueue* queue, void (*free)(void* arg)) {
queue->free = free; queue->free = free;
} }
bool transQueuePush(STransQueue* queue, void* arg) { bool transQueuePush(STransQueue* queue, void* arg) {
if (queue->q == NULL) {
return true;
}
taosArrayPush(queue->q, &arg); taosArrayPush(queue->q, &arg);
if (taosArrayGetSize(queue->q) > 1) { if (taosArrayGetSize(queue->q) > 1) {
return false; return false;
@ -299,7 +305,7 @@ bool transQueuePush(STransQueue* queue, void* arg) {
return true; return true;
} }
void* transQueuePop(STransQueue* queue) { void* transQueuePop(STransQueue* queue) {
if (taosArrayGetSize(queue->q) == 0) { if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
return NULL; return NULL;
} }
void* ptr = taosArrayGetP(queue->q, 0); void* ptr = taosArrayGetP(queue->q, 0);
@ -307,11 +313,13 @@ void* transQueuePop(STransQueue* queue) {
return ptr; return ptr;
} }
int32_t transQueueSize(STransQueue* queue) { int32_t transQueueSize(STransQueue* queue) {
// Get size if (queue->q == NULL) {
return 0;
}
return taosArrayGetSize(queue->q); return taosArrayGetSize(queue->q);
} }
void* transQueueGet(STransQueue* queue, int i) { void* transQueueGet(STransQueue* queue, int i) {
if (taosArrayGetSize(queue->q) == 0) { if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
return NULL; return NULL;
} }
if (i >= taosArrayGetSize(queue->q)) { if (i >= taosArrayGetSize(queue->q)) {
@ -323,7 +331,7 @@ void* transQueueGet(STransQueue* queue, int i) {
} }
void* transQueueRm(STransQueue* queue, int i) { void* transQueueRm(STransQueue* queue, int i) {
if (taosArrayGetSize(queue->q) == 0) { if (queue->q == NULL || taosArrayGetSize(queue->q) == 0) {
return NULL; return NULL;
} }
if (i >= taosArrayGetSize(queue->q)) { if (i >= taosArrayGetSize(queue->q)) {
@ -335,7 +343,9 @@ void* transQueueRm(STransQueue* queue, int i) {
} }
bool transQueueEmpty(STransQueue* queue) { bool transQueueEmpty(STransQueue* queue) {
// if (queue->q == NULL) {
return true;
}
return taosArrayGetSize(queue->q) == 0; return taosArrayGetSize(queue->q) == 0;
} }
void transQueueClear(STransQueue* queue) { void transQueueClear(STransQueue* queue) {

View File

@ -623,8 +623,6 @@ static void destroyConn(SSrvConn* conn, bool clear) {
return; return;
} }
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
transQueueDestroy(&conn->srvMsgs);
if (clear) { if (clear) {
tTrace("server conn %p to be destroyed", conn); tTrace("server conn %p to be destroyed", conn);
uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t));
@ -640,6 +638,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
tDebug("server conn %p destroy", conn); tDebug("server conn %p destroy", conn);
uv_timer_stop(&conn->pTimer); uv_timer_stop(&conn->pTimer);
transQueueDestroy(&conn->srvMsgs);
QUEUE_REMOVE(&conn->queue); QUEUE_REMOVE(&conn->queue);
free(conn->pTcp); free(conn->pTcp);
// free(conn); // free(conn);
@ -839,6 +838,9 @@ void transSendResponse(const STransMsg* pMsg) {
} }
SSrvConn* pConn = pMsg->handle; SSrvConn* pConn = pMsg->handle;
SWorkThrdObj* pThrd = pConn->hostThrd; SWorkThrdObj* pThrd = pConn->hostThrd;
if (pThrd->quit) {
return;
}
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
srvMsg->pConn = pConn; srvMsg->pConn = pConn;