diff --git a/docs/zh/12-taos-sql/21-node.md b/docs/zh/12-taos-sql/21-node.md index 47aa2077a3..4816daf420 100644 --- a/docs/zh/12-taos-sql/21-node.md +++ b/docs/zh/12-taos-sql/21-node.md @@ -40,7 +40,6 @@ ALTER ALL DNODES dnode_option dnode_option: { 'resetLog' - | 'resetQueryCache' | 'balance' value | 'monitor' value | 'debugFlag' value diff --git a/docs/zh/17-operation/02-planning.mdx b/docs/zh/17-operation/02-planning.mdx index 5cf3295288..c45181f416 100644 --- a/docs/zh/17-operation/02-planning.mdx +++ b/docs/zh/17-operation/02-planning.mdx @@ -18,7 +18,7 @@ title: 容量规划 关于这些参数的详细说明请参考 [数据库管理](../taos-sql/database)。 -一个数据库所需要的内存大小等于 +一个数据库所需要的内存大小等于 ``` vgroups * replica * (buffer + pages * pagesize + cachesize) @@ -26,7 +26,6 @@ vgroups * replica * (buffer + pages * pagesize + cachesize) 但要注意的是这些内存并不需要由单一服务器提供,而是由整个集群中所有数据节点共同负担,相当于由这些数据节点所在的服务器共同负担。如果集群中有不止一个数据库,则所需内存要累加,并由集群中所有服务器共同负担。更复杂的场景是如果集群中的数据节点并非在最初就一次性全部建立,而是随着使用中系统负载的增加逐步增加服务器并增加数据节点,则新创建的数据库会导致新旧数据节点上的负载并不均衡,此时简单的理论计算并不能直接使用,要结合各数据节点的负载情况。 - ## 客户端内存需求 客户端应用采用 taosc 客户端驱动连接服务端,会有内存需求的开销。 diff --git a/docs/zh/17-operation/03-tolerance.md b/docs/zh/17-operation/03-tolerance.md index e2f40778ff..2be914c6b2 100644 --- a/docs/zh/17-operation/03-tolerance.md +++ b/docs/zh/17-operation/03-tolerance.md @@ -4,16 +4,16 @@ title: 容错和灾备 ## 容错 -TDengine 支持**WAL**(Write Ahead Log)机制,实现数据的容错能力,保证数据的高可用。 +TDengine 支持 **WAL**(Write Ahead Log)机制,实现数据的容错能力,保证数据的高可用。 TDengine 接收到应用的请求数据包时,先将请求的原始数据包写入数据库日志文件,等数据成功写入数据库数据文件后,再删除相应的 WAL。这样保证了 TDengine 能够在断电等因素导致的服务重启时从数据库日志文件中恢复数据,避免数据的丢失。 涉及的系统配置参数有两个: -- wal_level:WAL 级别,1:写WAL,但不执行fsync。2:写WAL,而且执行fsync。默认值为 1。 -- wal_fsync_period:当 wal_evel 设置为 2 时,执行 fsync 的周期。设置为 0,表示每次写入,立即执行 fsync。 +- wal_level:WAL 级别,1:写 WAL,但不执行 fsync。2:写 WAL,而且执行 fsync。默认值为 1。 +- wal_fsync_period:当 wal_level 设置为 2 时,执行 fsync 的周期。设置为 0,表示每次写入,立即执行 fsync。 -如果要 100%的保证数据不丢失,需要将 wal_level 设置为 2,fsync 设置为 0。这时写入速度将会下降。但如果应用侧启动的写数据的线程数达到一定的数量(超过 50),那么写入数据的性能也会很不错,只会比 fsync 设置为 3000 毫秒下降 30%左右。 +如果要 100%的保证数据不丢失,需要将 wal_level 设置为 2,wal_fsync_period 设置为 0。这时写入速度将会下降。但如果应用侧启动的写数据的线程数达到一定的数量(超过 50),那么写入数据的性能也会很不错,只会比 wal_fsync_period 设置为 3000 毫秒下降 30%左右。 ## 灾备 diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index d84455ac94..3f90f087fd 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -805,14 +805,6 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { return -1; } - SDnodeObj *pDnode = mndAcquireDnode(pMnode, cfgReq.dnodeId); - if (pDnode == NULL) { - mError("dnode:%d, failed to config since %s ", cfgReq.dnodeId, terrstr()); - return -1; - } - SEpSet epSet = mndGetDnodeEpset(pDnode); - mndReleaseDnode(pMnode, pDnode); - SDCfgDnodeReq dcfgReq = {0}; if (strcasecmp(cfgReq.config, "resetlog") == 0) { strcpy(dcfgReq.config, "resetlog"); @@ -860,16 +852,36 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { } } - int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, &dcfgReq); - void *pBuf = rpcMallocCont(bufLen); + int32_t code = -1; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + while (1) { + SDnodeObj *pDnode = NULL; + pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); + if (pIter == NULL) break; - if (pBuf == NULL) return -1; - tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq); + if (pDnode->id == cfgReq.dnodeId || cfgReq.dnodeId == -1 || cfgReq.dnodeId == 0) { + SEpSet epSet = mndGetDnodeEpset(pDnode); + int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, &dcfgReq); + void *pBuf = rpcMallocCont(bufLen); - mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle, - dcfgReq.config, dcfgReq.value); - SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen}; - return tmsgSendReq(&epSet, &rpcMsg); + if (pBuf != NULL) { + tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq); + mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle, + dcfgReq.config, dcfgReq.value); + SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen}; + tmsgSendReq(&epSet, &rpcMsg); + code = 0; + } + } + + sdbRelease(pSdb, pDnode); + } + + if (code == -1) { + terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + } + return code; } static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 2d7117c11f..b2a0e6aac8 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1287,6 +1287,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code)); continueExec = false; } else { + pTrans->failedTimes++; pTrans->code = terrno; if (pTrans->policy == TRN_POLICY_ROLLBACK) { if (pTrans->lastAction != 0) { @@ -1306,7 +1307,6 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { mError("trans:%d, stage from redoAction to rollback since %s", pTrans->id, terrstr()); continueExec = true; } else { - pTrans->failedTimes++; mError("trans:%d, stage keep on redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); continueExec = false; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 4323fa0aff..a0e5071685 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -281,8 +281,8 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { for (int32_t i = 0; i < numOfMsgs; ++i) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType), - pMsg->info.handle); + vGInfo("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%ld", vgId, pMsg, + TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex); SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; if (rsp.code == 0) { @@ -503,9 +503,6 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { if (cbMeta.isWeak == 0) { SVnode *pVnode = pFsm->data; - vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s", - syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, - syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType)); if (cbMeta.code == 0) { SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; @@ -514,11 +511,17 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); rpcMsg.info.conn.applyIndex = cbMeta.index; rpcMsg.info.conn.applyTerm = cbMeta.term; + + vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 + ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s", + syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak, + cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType)); + tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); } else { SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info}; - vError("vgId:%d, sync commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync), - pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code)); + vError("vgId:%d, sync commit error, msgtype:%d,%s, index:%ld, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync), + pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code)); if (rsp.info.handle != NULL) { tmsgSendRsp(&rsp); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fb82ab206c..200df6bc80 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -612,8 +612,7 @@ int32_t sumFunction(SqlFunctionCtx* pCtx) { SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); if (IS_NULL_TYPE(type)) { - GET_RES_INFO(pCtx)->isNullRes = 1; - numOfElem = 1; + numOfElem = 0; goto _sum_over; } @@ -1172,8 +1171,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { SMinmaxResInfo* pBuf = GET_ROWCELL_INTERBUF(pResInfo); if (IS_NULL_TYPE(type)) { - GET_RES_INFO(pCtx)->isNullRes = 1; - numOfElems = 1; + numOfElems = 0; goto _min_max_over; } diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 7fc41b8dff..e1c8ac0204 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -579,11 +579,13 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou if (ctx->noExec == false) { for (int32_t m = 0; m < node->pParameterList->length; m++) { - // add impl later if (node->condType == LOGIC_COND_TYPE_AND) { taosArrayAddAll(output->result, params[m].result); + // taosArrayDestroy(params[m].result); + // params[m].result = NULL; } else if (node->condType == LOGIC_COND_TYPE_OR) { taosArrayAddAll(output->result, params[m].result); + // params[m].result = NULL; } else if (node->condType == LOGIC_COND_TYPE_NOT) { // taosArrayAddAll(output->result, params[m].result); } @@ -593,6 +595,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou } else { for (int32_t m = 0; m < node->pParameterList->length; m++) { output->status = sifMergeCond(node->condType, output->status, params[m].status); + taosArrayDestroy(params[m].result); + params[m].result = NULL; } } _return: @@ -607,6 +611,7 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) { SIFCtx *ctx = context; ctx->code = sifExecFunction(node, ctx, &output); if (ctx->code != TSDB_CODE_SUCCESS) { + sifFreeParam(&output); return DEAL_RES_ERROR; } @@ -624,6 +629,7 @@ static EDealRes sifWalkLogic(SNode *pNode, void *context) { SIFCtx *ctx = context; ctx->code = sifExecLogic(node, ctx, &output); if (ctx->code) { + sifFreeParam(&output); return DEAL_RES_ERROR; } @@ -640,6 +646,7 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) { SIFCtx *ctx = context; ctx->code = sifExecOper(node, ctx, &output); if (ctx->code) { + sifFreeParam(&output); return DEAL_RES_ERROR; } if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { @@ -698,7 +705,11 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { } nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx); - SIF_ERR_RET(ctx.code); + + if (ctx.code != 0) { + sifFreeRes(ctx.pRes); + return ctx.code; + } if (pDst) { SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES); @@ -714,8 +725,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); } sifFreeRes(ctx.pRes); - - SIF_RET(code); + return code; } static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { @@ -732,8 +742,10 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { } nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx); - - SIF_ERR_RET(ctx.code); + if (ctx.code != 0) { + sifFreeRes(ctx.pRes); + return ctx.code; + } SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES); if (res == NULL) { @@ -745,8 +757,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { sifFreeParam(res); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); taosHashCleanup(ctx.pRes); - - SIF_RET(code); + return code; } int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status) { @@ -760,7 +771,11 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SArray *output = taosArrayInit(8, sizeof(uint64_t)); SIFParam param = {.arg = *metaArg, .result = output}; - SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, ¶m)); + int32_t code = sifCalculate((SNode *)pFilterNode, ¶m); + if (code != 0) { + sifFreeParam(¶m); + return code; + } taosArrayAddAll(result, param.result); sifFreeParam(¶m); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 064c110a9f..efbe110f6f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1,5 +1,4 @@ /** Copyright (c) 2019 TAOS Data, Inc. - * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 @@ -16,6 +15,10 @@ #ifdef USE_UV #include "transComm.h" +typedef struct SConnList { + queue conn; +} SConnList; + typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -26,7 +29,9 @@ typedef struct SCliConn { SConnBuffer readBuf; STransQueue cliMsgs; - queue q; + + queue q; + SConnList* list; STransCtx ctx; bool broken; // link broken or not @@ -56,13 +61,14 @@ typedef struct SCliMsg { } SCliMsg; typedef struct SCliThrd { - TdThread thread; // tid - int64_t pid; // pid - uv_loop_t* loop; - SAsyncPool* asyncPool; - uv_idle_t* idle; - uv_timer_t timer; - void* pool; // conn pool + TdThread thread; // tid + int64_t pid; // pid + uv_loop_t* loop; + SAsyncPool* asyncPool; + uv_idle_t* idle; + uv_prepare_t* prepare; + uv_timer_t timer; + void* pool; // conn pool // msg queue queue msg; @@ -86,10 +92,6 @@ typedef struct SCliObj { SCliThrd** pThreadObj; } SCliObj; -typedef struct SConnList { - queue conn; -} SConnList; - // conn pool // add expire timeout and capacity limit static void* createConnPool(int size); @@ -101,7 +103,7 @@ static void doCloseIdleConn(void* param); static int sockDebugInfo(struct sockaddr* sockname, char* dst) { struct sockaddr_in addr = *(struct sockaddr_in*)sockname; - char buf[20] = {0}; + char buf[16] = {0}; int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf)); sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); return r; @@ -118,6 +120,9 @@ static void cliSendCb(uv_write_t* req, int status); static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); static void cliIdleCb(uv_idle_t* handle); +static void cliPrepareCb(uv_prepare_t* handle); + +static int32_t allocConnRef(SCliConn* conn, bool update); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); @@ -198,7 +203,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { pThrd = (SCliThrd*)(exh)->pThrd; \ } \ } while (0) -#define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : (para)) +#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_SHOULD_RELEASE(conn, head) \ @@ -499,9 +504,8 @@ void* destroyConnPool(void* pool) { } static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { - char key[128] = {0}; + char key[32] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); - SHashObj* pPool = pool; SConnList* plist = taosHashGet(pPool, key, strlen(key)); if (plist == NULL) { @@ -519,13 +523,44 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { conn->status = ConnNormal; QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); - assert(h == &conn->q); transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); conn->task = NULL; return conn; } +static void addConnToPool(void* pool, SCliConn* conn) { + if (conn->status == ConnInPool) { + return; + } + SCliThrd* thrd = conn->hostThrd; + CONN_HANDLE_THREAD_QUIT(thrd); + + allocConnRef(conn, true); + + STrans* pTransInst = thrd->pTransInst; + cliReleaseUnfinishedMsg(conn); + transQueueClear(&conn->cliMsgs); + transCtxCleanup(&conn->ctx); + conn->status = ConnInPool; + + if (conn->list == NULL) { + char key[32] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port); + tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); + conn->list = taosHashGet((SHashObj*)pool, key, strlen(key)); + } + assert(conn->list != NULL); + QUEUE_INIT(&conn->q); + QUEUE_PUSH(&conn->list->conn, &conn->q); + + assert(!QUEUE_IS_EMPTY(&conn->list->conn)); + + STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); + arg->param1 = conn; + arg->param2 = thrd; + conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); +} static int32_t allocConnRef(SCliConn* conn, bool update) { if (update) { transRemoveExHandle(transGetRefMgt(), conn->refId); @@ -556,38 +591,6 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { return 0; } -static void addConnToPool(void* pool, SCliConn* conn) { - if (conn->status == ConnInPool) { - return; - } - SCliThrd* thrd = conn->hostThrd; - CONN_HANDLE_THREAD_QUIT(thrd); - - allocConnRef(conn, true); - - STrans* pTransInst = thrd->pTransInst; - cliReleaseUnfinishedMsg(conn); - transQueueClear(&conn->cliMsgs); - transCtxCleanup(&conn->ctx); - conn->status = ConnInPool; - - char key[128] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port); - tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); - - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); - // list already create before - assert(plist != NULL); - QUEUE_INIT(&conn->q); - QUEUE_PUSH(&plist->conn, &conn->q); - - assert(!QUEUE_IS_EMPTY(&plist->conn)); - - STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); - arg->param1 = conn; - arg->param2 = thrd; - conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); -} static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; @@ -602,11 +605,9 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { pBuf->len += nread; - if (transReadComplete(pBuf)) { + while (transReadComplete(pBuf)) { tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); cliHandleResp(conn); - } else { - tTrace("%s conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn); } return; } @@ -967,6 +968,62 @@ static void cliAsyncCb(uv_async_t* handle) { static void cliIdleCb(uv_idle_t* handle) { SCliThrd* thrd = handle->data; tTrace("do idle work"); + + SAsyncPool* pool = thrd->asyncPool; + for (int i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = async->data; + + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + if (pMsg == NULL) { + continue; + } + (*cliAsyncHandle[pMsg->type])(pMsg, thrd); + count++; + } + } + tTrace("prepare work end"); + if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); +} +static void cliPrepareCb(uv_prepare_t* handle) { + SCliThrd* thrd = handle->data; + tTrace("prepare work start"); + + SAsyncPool* pool = thrd->asyncPool; + for (int i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = async->data; + + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + if (pMsg == NULL) { + continue; + } + (*cliAsyncHandle[pMsg->type])(pMsg, thrd); + count++; + } + } + tTrace("prepare work end"); + if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); } static void* cliWorkThread(void* arg) { @@ -1033,7 +1090,12 @@ static SCliThrd* createThrdObj() { // pThrd->idle = taosMemoryCalloc(1, sizeof(uv_idle_t)); // uv_idle_init(pThrd->loop, pThrd->idle); // pThrd->idle->data = pThrd; - // uv_idle_start(pThrd->idle, cliIdleCb); + // uv_idle_start(pThrd->idle, cliIdleCb); + + pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); + uv_prepare_init(pThrd->loop, pThrd->prepare); + pThrd->prepare->data = pThrd; + uv_prepare_start(pThrd->prepare, cliPrepareCb); pThrd->pool = createConnPool(4); transDQCreate(pThrd->loop, &pThrd->delayQueue); @@ -1058,6 +1120,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { transDQDestroy(pThrd->timeoutQueue, NULL); taosMemoryFree(pThrd->idle); + taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index c99effb26f..8cf525a506 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -120,8 +120,9 @@ int transInitBuffer(SConnBuffer* buf) { buf->total = 0; return 0; } -int transDestroyBuffer(SConnBuffer* buf) { - taosMemoryFree(buf->buf); +int transDestroyBuffer(SConnBuffer* p) { + taosMemoryFree(p->buf); + p->buf = NULL; return 0; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index a97e0b53c1..4b579a1f95 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -73,6 +73,7 @@ typedef struct SWorkThrd { uv_os_fd_t fd; uv_loop_t* loop; SAsyncPool* asyncPool; + uv_prepare_t* prepare; queue msg; TdThreadMutex msgMtx; @@ -112,6 +113,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) static void uvWorkerAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle); static void uvShutDownCb(uv_shutdown_t* req, int status); +static void uvPrepareCb(uv_prepare_t* handle); /* * time-consuming task throwed into BG work thread @@ -238,8 +240,6 @@ static void uvHandleReq(SSvrConn* pConn) { transMsg.msgType = pHead->msgType; transMsg.code = pHead->code; - // transClearBuffer(&pConn->readBuf); - pConn->inType = pHead->msgType; if (pConn->status == ConnNormal) { if (pHead->persist == 1) { @@ -546,6 +546,52 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { uv_close((uv_handle_t*)req->handle, uvDestroyConn); taosMemoryFree(req); } +static void uvPrepareCb(uv_prepare_t* handle) { + // prepare callback + SWorkThrd* pThrd = handle->data; + SAsyncPool* pool = pThrd->asyncPool; + + for (int i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = async->data; + + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + while (!QUEUE_IS_EMPTY(&wq)) { + queue* head = QUEUE_HEAD(&wq); + QUEUE_REMOVE(head); + + SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q); + if (msg == NULL) { + tError("unexcept occurred, continue"); + continue; + } + // release handle to rpc init + if (msg->type == Quit) { + (*transAsyncHandle[msg->type])(msg, pThrd); + continue; + } else { + STransMsg transMsg = msg->msg; + + SExHandle* exh1 = transMsg.info.handle; + int64_t refId = transMsg.info.refId; + SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); + if (exh2 == NULL || exh1 != exh2) { + tTrace("handle except msg %p, ignore it", exh1); + transReleaseExHandle(transGetRefMgt(), refId); + destroySmsg(msg); + continue; + } + msg->pConn = exh1->handle; + transReleaseExHandle(transGetRefMgt(), refId); + (*transAsyncHandle[msg->type])(msg, pThrd); + } + } + } +} static void uvWorkDoTask(uv_work_t* req) { // doing time-consumeing task @@ -695,13 +741,17 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { } uv_pipe_init(pThrd->loop, pThrd->pipe, 1); - // int r = uv_pipe_open(pThrd->pipe, pThrd->fd); pThrd->pipe->data = pThrd; QUEUE_INIT(&pThrd->msg); taosThreadMutexInit(&pThrd->msgMtx, NULL); + pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); + uv_prepare_init(pThrd->loop, pThrd->prepare); + uv_prepare_start(pThrd->prepare, uvPrepareCb); + pThrd->prepare->data = pThrd; + // conn set QUEUE_INIT(&pThrd->conn); @@ -986,6 +1036,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { SRV_RELEASE_UV(pThrd->loop); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); transAsyncPoolDestroy(pThrd->asyncPool); + taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 4a6a1ca0b8..6de5a9ab98 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -166,7 +166,7 @@ # ---- query ---- ./test.sh -f tsim/query/charScalarFunction.sim -# ./test.sh -f tsim/query/explain.sim +./test.sh -f tsim/query/explain.sim ./test.sh -f tsim/query/interval-offset.sim ./test.sh -f tsim/query/interval.sim ./test.sh -f tsim/query/scalarFunction.sim diff --git a/tests/script/tsim/mnode/basic4.sim b/tests/script/tsim/mnode/basic4.sim index 6b4c8c6b7c..0ffcdd8c00 100644 --- a/tests/script/tsim/mnode/basic4.sim +++ b/tests/script/tsim/mnode/basic4.sim @@ -45,11 +45,8 @@ endi if $data(2)[4] != ready then goto step2 endi -if $data(3)[4] != ready then - goto step2 -endi -system sh/exec.sh -n dnode3 -s stop -x SIGKILL +system sh/exec.sh -n dnode3 -s stop sql_error create mnode on dnode 3 print =============== step3: show mnodes