From a3447c5dae2338a853ca84d4f214fa08abaaa1b0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Oct 2022 14:59:47 +0800 Subject: [PATCH 1/8] avoid invalid read/write --- source/libs/transport/src/transSvr.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index cc381d06a7..1d6dbe64ed 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -374,7 +374,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { } else { tError("fail to dispatch conn to work thread"); } - uv_close((uv_handle_t*)req->data, uvFreeCb); + if (!uv_is_closing((uv_handle_t*)req->data)) { + uv_close((uv_handle_t*)req->data, uvFreeCb); + } else { + taosMemoryFree(req->data); + } taosMemoryFree(req); } @@ -668,7 +672,11 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb); } else { - uv_close((uv_handle_t*)cli, NULL); + if (!uv_is_closing((uv_handle_t*)cli)) { + uv_close((uv_handle_t*)cli, NULL); + } else { + taosMemoryFree(cli); + } } } void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { @@ -681,7 +689,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { tWarn("failed to create connect:%p", q); taosMemoryFree(buf->base); uv_close((uv_handle_t*)q, NULL); - // taosMemoryFree(q); return; } // free memory allocated by @@ -972,6 +979,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId()); #endif + ret = uv_pipe_bind(&srv->pipeListen, pipeName); if (ret != 0) { tError("failed to bind pipe, errmsg: %s", uv_err_name(ret)); @@ -997,6 +1005,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, if (false == addHandleToWorkloop(thrd, pipeName)) { goto End; } + int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); if (err == 0) { tDebug("success to create worker-thread:%d", i); @@ -1006,14 +1015,17 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; } } + if (false == taosValidIpAndPort(srv->ip, srv->port)) { terrno = TAOS_SYSTEM_ERROR(errno); tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr()); goto End; } + if (false == addHandleToAcceptloop(srv)) { goto End; } + int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv); if (err == 0) { tDebug("success to create accept-thread"); @@ -1022,6 +1034,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; // clear all resource later } + srv->inited = true; return srv; End: From 6ee7b412edd54b8b6bca8b32a0f19bcd2ff39601 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Oct 2022 18:35:59 +0800 Subject: [PATCH 2/8] opt sys index --- source/dnode/vnode/src/meta/metaQuery.c | 8 +++++++- source/libs/executor/src/scanoperator.c | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 0727d32674..0ab7324e7f 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1074,7 +1074,13 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type); if (cmp == 0) taosArrayPush(pUids, &p->uid); - if (cmp == -1) break; + + if (param->reverse == false) { + if (cmp == -1) break; + } else if (param->reverse) { + if (cmp == 1) break; + } + valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur); if (valid < 0) break; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b782313688..6e841f2daa 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2880,7 +2880,7 @@ int optSysDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) { default: return -1; } - return 1; + return cmp; } static int optSysFilterFuncImpl__LowerThan(void* a, void* b, int16_t dtype) { From 1421268479ff5e0acdb488921f4732169809495c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Oct 2022 18:59:12 +0800 Subject: [PATCH 3/8] opt sys index --- source/dnode/vnode/src/meta/metaQuery.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 0ab7324e7f..43e80b99d4 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1062,7 +1062,7 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) if (tdbTbcMoveTo(pCursor->pCur, &ctimeKey, sizeof(ctimeKey), &cmp) < 0) { goto END; } - bool first = true; + int32_t valid = 0; while (1) { void *entryKey = NULL; From afb309c55715606f51fd4858a02731efaf85791e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Oct 2022 19:22:09 +0800 Subject: [PATCH 4/8] opt sys index --- source/dnode/vnode/src/meta/metaTable.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index b6c4b35d9b..f62c2f23ef 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -572,8 +572,12 @@ static int metaBuildCtimeIdxKey(SCtimeIdxKey *ctimeKey, const SMetaEntry *pME) { } static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) { - ncolKey->ncol = pME->ntbEntry.schemaRow.nCols; - ncolKey->uid = pME->uid; + if (pME->type == TSDB_NORMAL_TABLE) { + ncolKey->ncol = pME->ntbEntry.schemaRow.nCols; + ncolKey->uid = pME->uid; + } else { + return -1; + } return 0; } @@ -778,6 +782,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl goto _err; } + // save old entry + SMetaEntry oldEntry = {.type = TSDB_NORMAL_TABLE, .uid = entry.uid}; + oldEntry.ntbEntry.schemaRow.nCols = pSchema->nCols; + // search the column to add/drop/update pSchema = &entry.ntbEntry.schemaRow; int32_t iCol = 0; @@ -868,6 +876,9 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl entry.version = version; + metaDeleteNcolIdx(pMeta, &oldEntry); + metaUpdateNcolIdx(pMeta, &entry); + // do actual write metaWLock(pMeta); From c6683d01af0bf6dac4649635af7e8cf8b93e9470 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Oct 2022 20:32:10 +0800 Subject: [PATCH 5/8] opt trans on no-windows --- source/libs/transport/src/transSvr.c | 55 +++++++++++++++++++++++++--- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 1d6dbe64ed..d5058a3b29 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -655,12 +655,14 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_tcp_init(pObj->loop, cli); if (uv_accept(stream, (uv_stream_t*)cli) == 0) { +#ifdef WINDOWS if (pObj->numOfWorkerReady < pObj->numOfThreads) { tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady); uv_close((uv_handle_t*)cli, NULL); return; } +#endif uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); wr->data = cli; @@ -777,7 +779,12 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { return false; } +#ifdef WINDOWS uv_pipe_init(pThrd->loop, pThrd->pipe, 1); +#else + uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + uv_pipe_open(pThrd->pipe, pThrd->fd); +#endif pThrd->pipe->data = pThrd; @@ -792,8 +799,11 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { QUEUE_INIT(&pThrd->conn); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); +#ifdef WINDOWS uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); - // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); +#else + uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); +#endif return true; } @@ -965,20 +975,18 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->port = port; uv_loop_init(srv->loop); + char pipeName[64]; +#ifdef WINDOWS int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0); if (ret != 0) { tError("failed to init pipe, errmsg: %s", uv_err_name(ret)); goto End; } -#ifdef WINDOWS - char pipeName[64]; snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId()); -#else char pipeName[PATH_MAX] = {0}; snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId()); -#endif ret = uv_pipe_bind(&srv->pipeListen, pipeName); if (ret != 0) { @@ -1015,7 +1023,44 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; } } +#else + for (int i = 0; i < srv->numOfThreads; i++) { + SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd)); + + thrd->pTransInst = shandle; + thrd->quit = false; + thrd->pTransInst = shandle; + + srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); + srv->pThreadObj[i] = thrd; + + uv_os_sock_t fds[2]; + if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + goto End; + } + + uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); + uv_pipe_open(&(srv->pipe[i][0]), fds[1]); + + thrd->pipe = &(srv->pipe[i][1]); // init read + thrd->fd = fds[0]; + + if (false == addHandleToWorkloop(thrd, pipeName)) { + goto End; + } + + int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); + if (err == 0) { + tDebug("success to create worker-thread:%d", i); + } else { + // TODO: clear all other resource later + tError("failed to create worker-thread:%d", i); + goto End; + } + } + +#endif if (false == taosValidIpAndPort(srv->ip, srv->port)) { terrno = TAOS_SYSTEM_ERROR(errno); tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr()); From 421d2e7332ca46fcb557be43e654d2f2a2f40b35 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 25 Oct 2022 08:48:05 +0800 Subject: [PATCH 6/8] opt trans on no-windows --- source/dnode/vnode/src/meta/metaTable.c | 4 ++-- source/libs/transport/src/transSvr.c | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index f62c2f23ef..59952a360e 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -781,13 +781,13 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; goto _err; } + // search the column to add/drop/update + pSchema = &entry.ntbEntry.schemaRow; // save old entry SMetaEntry oldEntry = {.type = TSDB_NORMAL_TABLE, .uid = entry.uid}; oldEntry.ntbEntry.schemaRow.nCols = pSchema->nCols; - // search the column to add/drop/update - pSchema = &entry.ntbEntry.schemaRow; int32_t iCol = 0; for (;;) { pColumn = NULL; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index d5058a3b29..6819068b64 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -975,7 +975,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->port = port; uv_loop_init(srv->loop); - char pipeName[64]; + char pipeName[PATH_MAX]; #ifdef WINDOWS int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0); if (ret != 0) { @@ -984,9 +984,9 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, } snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId()); - char pipeName[PATH_MAX] = {0}; - snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), - taosGetSelfPthreadId()); + // char pipeName[PATH_MAX] = {0}; + // snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), + // taosGetSelfPthreadId()); ret = uv_pipe_bind(&srv->pipeListen, pipeName); if (ret != 0) { From 0905f16294e942d3b82260774c35a950cb5196bd Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 25 Oct 2022 10:17:05 +0800 Subject: [PATCH 7/8] opt trans on no-windows --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6e841f2daa..fb7b0804db 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3072,7 +3072,7 @@ static int32_t sysChkFilter__Comm(SNode* pNode) { SOperatorNode* pOper = (SOperatorNode*)pNode; EOperatorType opType = pOper->opType; if (opType != OP_TYPE_EQUAL && opType != OP_TYPE_LOWER_EQUAL && opType != OP_TYPE_LOWER_THAN && - OP_TYPE_GREATER_EQUAL && opType != OP_TYPE_GREATER_THAN) { + opType != OP_TYPE_GREATER_EQUAL && opType != OP_TYPE_GREATER_THAN) { return -1; } return 0; From 209243055603a65ac5d7ae7c03a97bd14fd17c42 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 25 Oct 2022 10:45:47 +0800 Subject: [PATCH 8/8] opt trans on no-windows --- source/libs/executor/src/scanoperator.c | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fb7b0804db..210c8167a8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2986,10 +2986,6 @@ static int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) { .val = pVal->datum.p, .reverse = reverse, .filterFunc = func}; - - int32_t ret = metaFilterCreateTime(pMeta, ¶m, result); - if (ret == 0) return 0; - return -1; } @@ -3001,15 +2997,17 @@ static int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) { bool reverse = false; __optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse); - SMetaFltParam param = {.suid = 0, - .cid = 0, - .type = TSDB_DATA_TYPE_BIGINT, - .val = &pVal->datum.i, - .reverse = reverse, - .filterFunc = func}; - int32_t ret = metaFilterCreateTime(pMeta, ¶m, result); if (func == NULL) return -1; - return 0; + + SMetaFltParam param = {.suid = 0, + .cid = 0, + .type = TSDB_DATA_TYPE_BIGINT, + .val = &pVal->datum.i, + .reverse = reverse, + .filterFunc = func}; + + int32_t ret = metaFilterCreateTime(pMeta, ¶m, result); + return ret; } static int32_t sysFilte__Ncolumn(void* arg, SNode* pNode, SArray* result) { void* pMeta = ((SSTabFltArg*)arg)->pMeta;