From a3447c5dae2338a853ca84d4f214fa08abaaa1b0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Oct 2022 14:59:47 +0800 Subject: [PATCH 01/12] avoid invalid read/write --- source/libs/transport/src/transSvr.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index cc381d06a7..1d6dbe64ed 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -374,7 +374,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { } else { tError("fail to dispatch conn to work thread"); } - uv_close((uv_handle_t*)req->data, uvFreeCb); + if (!uv_is_closing((uv_handle_t*)req->data)) { + uv_close((uv_handle_t*)req->data, uvFreeCb); + } else { + taosMemoryFree(req->data); + } taosMemoryFree(req); } @@ -668,7 +672,11 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb); } else { - uv_close((uv_handle_t*)cli, NULL); + if (!uv_is_closing((uv_handle_t*)cli)) { + uv_close((uv_handle_t*)cli, NULL); + } else { + taosMemoryFree(cli); + } } } void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { @@ -681,7 +689,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { tWarn("failed to create connect:%p", q); taosMemoryFree(buf->base); uv_close((uv_handle_t*)q, NULL); - // taosMemoryFree(q); return; } // free memory allocated by @@ -972,6 +979,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId()); #endif + ret = uv_pipe_bind(&srv->pipeListen, pipeName); if (ret != 0) { tError("failed to bind pipe, errmsg: %s", uv_err_name(ret)); @@ -997,6 +1005,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, if (false == addHandleToWorkloop(thrd, pipeName)) { goto End; } + int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); if (err == 0) { tDebug("success to create worker-thread:%d", i); @@ -1006,14 +1015,17 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; } } + if (false == taosValidIpAndPort(srv->ip, srv->port)) { terrno = TAOS_SYSTEM_ERROR(errno); tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr()); goto End; } + if (false == addHandleToAcceptloop(srv)) { goto End; } + int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv); if (err == 0) { tDebug("success to create accept-thread"); @@ -1022,6 +1034,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; // clear all resource later } + srv->inited = true; return srv; End: From 6ee7b412edd54b8b6bca8b32a0f19bcd2ff39601 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Oct 2022 18:35:59 +0800 Subject: [PATCH 02/12] opt sys index --- source/dnode/vnode/src/meta/metaQuery.c | 8 +++++++- source/libs/executor/src/scanoperator.c | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 0727d32674..0ab7324e7f 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1074,7 +1074,13 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type); if (cmp == 0) taosArrayPush(pUids, &p->uid); - if (cmp == -1) break; + + if (param->reverse == false) { + if (cmp == -1) break; + } else if (param->reverse) { + if (cmp == 1) break; + } + valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur); if (valid < 0) break; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b782313688..6e841f2daa 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2880,7 +2880,7 @@ int optSysDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) { default: return -1; } - return 1; + return cmp; } static int optSysFilterFuncImpl__LowerThan(void* a, void* b, int16_t dtype) { From 1421268479ff5e0acdb488921f4732169809495c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Oct 2022 18:59:12 +0800 Subject: [PATCH 03/12] opt sys index --- source/dnode/vnode/src/meta/metaQuery.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 0ab7324e7f..43e80b99d4 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1062,7 +1062,7 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) if (tdbTbcMoveTo(pCursor->pCur, &ctimeKey, sizeof(ctimeKey), &cmp) < 0) { goto END; } - bool first = true; + int32_t valid = 0; while (1) { void *entryKey = NULL; From afb309c55715606f51fd4858a02731efaf85791e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Oct 2022 19:22:09 +0800 Subject: [PATCH 04/12] opt sys index --- source/dnode/vnode/src/meta/metaTable.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index b6c4b35d9b..f62c2f23ef 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -572,8 +572,12 @@ static int metaBuildCtimeIdxKey(SCtimeIdxKey *ctimeKey, const SMetaEntry *pME) { } static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) { - ncolKey->ncol = pME->ntbEntry.schemaRow.nCols; - ncolKey->uid = pME->uid; + if (pME->type == TSDB_NORMAL_TABLE) { + ncolKey->ncol = pME->ntbEntry.schemaRow.nCols; + ncolKey->uid = pME->uid; + } else { + return -1; + } return 0; } @@ -778,6 +782,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl goto _err; } + // save old entry + SMetaEntry oldEntry = {.type = TSDB_NORMAL_TABLE, .uid = entry.uid}; + oldEntry.ntbEntry.schemaRow.nCols = pSchema->nCols; + // search the column to add/drop/update pSchema = &entry.ntbEntry.schemaRow; int32_t iCol = 0; @@ -868,6 +876,9 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl entry.version = version; + metaDeleteNcolIdx(pMeta, &oldEntry); + metaUpdateNcolIdx(pMeta, &entry); + // do actual write metaWLock(pMeta); From c6683d01af0bf6dac4649635af7e8cf8b93e9470 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Oct 2022 20:32:10 +0800 Subject: [PATCH 05/12] opt trans on no-windows --- source/libs/transport/src/transSvr.c | 55 +++++++++++++++++++++++++--- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 1d6dbe64ed..d5058a3b29 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -655,12 +655,14 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_tcp_init(pObj->loop, cli); if (uv_accept(stream, (uv_stream_t*)cli) == 0) { +#ifdef WINDOWS if (pObj->numOfWorkerReady < pObj->numOfThreads) { tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady); uv_close((uv_handle_t*)cli, NULL); return; } +#endif uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); wr->data = cli; @@ -777,7 +779,12 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { return false; } +#ifdef WINDOWS uv_pipe_init(pThrd->loop, pThrd->pipe, 1); +#else + uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + uv_pipe_open(pThrd->pipe, pThrd->fd); +#endif pThrd->pipe->data = pThrd; @@ -792,8 +799,11 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { QUEUE_INIT(&pThrd->conn); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); +#ifdef WINDOWS uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); - // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); +#else + uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); +#endif return true; } @@ -965,20 +975,18 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->port = port; uv_loop_init(srv->loop); + char pipeName[64]; +#ifdef WINDOWS int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0); if (ret != 0) { tError("failed to init pipe, errmsg: %s", uv_err_name(ret)); goto End; } -#ifdef WINDOWS - char pipeName[64]; snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId()); -#else char pipeName[PATH_MAX] = {0}; snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId()); -#endif ret = uv_pipe_bind(&srv->pipeListen, pipeName); if (ret != 0) { @@ -1015,7 +1023,44 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; } } +#else + for (int i = 0; i < srv->numOfThreads; i++) { + SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd)); + + thrd->pTransInst = shandle; + thrd->quit = false; + thrd->pTransInst = shandle; + + srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); + srv->pThreadObj[i] = thrd; + + uv_os_sock_t fds[2]; + if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + goto End; + } + + uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); + uv_pipe_open(&(srv->pipe[i][0]), fds[1]); + + thrd->pipe = &(srv->pipe[i][1]); // init read + thrd->fd = fds[0]; + + if (false == addHandleToWorkloop(thrd, pipeName)) { + goto End; + } + + int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); + if (err == 0) { + tDebug("success to create worker-thread:%d", i); + } else { + // TODO: clear all other resource later + tError("failed to create worker-thread:%d", i); + goto End; + } + } + +#endif if (false == taosValidIpAndPort(srv->ip, srv->port)) { terrno = TAOS_SYSTEM_ERROR(errno); tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr()); From 421d2e7332ca46fcb557be43e654d2f2a2f40b35 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 25 Oct 2022 08:48:05 +0800 Subject: [PATCH 06/12] opt trans on no-windows --- source/dnode/vnode/src/meta/metaTable.c | 4 ++-- source/libs/transport/src/transSvr.c | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index f62c2f23ef..59952a360e 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -781,13 +781,13 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; goto _err; } + // search the column to add/drop/update + pSchema = &entry.ntbEntry.schemaRow; // save old entry SMetaEntry oldEntry = {.type = TSDB_NORMAL_TABLE, .uid = entry.uid}; oldEntry.ntbEntry.schemaRow.nCols = pSchema->nCols; - // search the column to add/drop/update - pSchema = &entry.ntbEntry.schemaRow; int32_t iCol = 0; for (;;) { pColumn = NULL; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index d5058a3b29..6819068b64 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -975,7 +975,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->port = port; uv_loop_init(srv->loop); - char pipeName[64]; + char pipeName[PATH_MAX]; #ifdef WINDOWS int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0); if (ret != 0) { @@ -984,9 +984,9 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, } snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId()); - char pipeName[PATH_MAX] = {0}; - snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), - taosGetSelfPthreadId()); + // char pipeName[PATH_MAX] = {0}; + // snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), + // taosGetSelfPthreadId()); ret = uv_pipe_bind(&srv->pipeListen, pipeName); if (ret != 0) { From 0905f16294e942d3b82260774c35a950cb5196bd Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 25 Oct 2022 10:17:05 +0800 Subject: [PATCH 07/12] opt trans on no-windows --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6e841f2daa..fb7b0804db 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3072,7 +3072,7 @@ static int32_t sysChkFilter__Comm(SNode* pNode) { SOperatorNode* pOper = (SOperatorNode*)pNode; EOperatorType opType = pOper->opType; if (opType != OP_TYPE_EQUAL && opType != OP_TYPE_LOWER_EQUAL && opType != OP_TYPE_LOWER_THAN && - OP_TYPE_GREATER_EQUAL && opType != OP_TYPE_GREATER_THAN) { + opType != OP_TYPE_GREATER_EQUAL && opType != OP_TYPE_GREATER_THAN) { return -1; } return 0; From 209243055603a65ac5d7ae7c03a97bd14fd17c42 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 25 Oct 2022 10:45:47 +0800 Subject: [PATCH 08/12] opt trans on no-windows --- source/libs/executor/src/scanoperator.c | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fb7b0804db..210c8167a8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2986,10 +2986,6 @@ static int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) { .val = pVal->datum.p, .reverse = reverse, .filterFunc = func}; - - int32_t ret = metaFilterCreateTime(pMeta, ¶m, result); - if (ret == 0) return 0; - return -1; } @@ -3001,15 +2997,17 @@ static int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) { bool reverse = false; __optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse); - SMetaFltParam param = {.suid = 0, - .cid = 0, - .type = TSDB_DATA_TYPE_BIGINT, - .val = &pVal->datum.i, - .reverse = reverse, - .filterFunc = func}; - int32_t ret = metaFilterCreateTime(pMeta, ¶m, result); if (func == NULL) return -1; - return 0; + + SMetaFltParam param = {.suid = 0, + .cid = 0, + .type = TSDB_DATA_TYPE_BIGINT, + .val = &pVal->datum.i, + .reverse = reverse, + .filterFunc = func}; + + int32_t ret = metaFilterCreateTime(pMeta, ¶m, result); + return ret; } static int32_t sysFilte__Ncolumn(void* arg, SNode* pNode, SArray* result) { void* pMeta = ((SSTabFltArg*)arg)->pMeta; From eb7b9d381931890296d9c0e1100761d254d5cdd1 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 25 Oct 2022 14:28:34 +0800 Subject: [PATCH 09/12] fix(sync): fix coverity scan issues --- source/libs/sync/src/syncCommit.c | 7 ++++ source/libs/sync/src/syncMain.c | 62 +++++++++++++++++++--------- source/libs/sync/src/syncMessage.c | 20 ++++++++- source/libs/sync/src/syncRaftEntry.c | 2 +- source/libs/sync/src/syncUtil.c | 3 +- 5 files changed, 70 insertions(+), 24 deletions(-) diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 811a7b8e99..3aeb2d30b5 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -45,6 +45,11 @@ // /\ UNCHANGED <> // void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { + if (pSyncNode == NULL) { + sError("pSyncNode is NULL"); + return; + } + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index"); return; @@ -172,6 +177,7 @@ static inline int64_t syncNodeAbs64(int64_t a, int64_t b) { int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { return pSyncNode->quorum; +#if 0 int32_t quorum = 1; // self int64_t timeNow = taosGetTimestampMs(); @@ -228,6 +234,7 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { } return quorum; +#endif } /* diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1f24c7c403..09584077f1 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -835,7 +835,9 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { sInfo("vgId:%d, sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); } - pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps; + if (pEpSet->numOfEps > 0) { + pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps; + } sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -1438,12 +1440,13 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { } void syncNodeClose(SSyncNode* pSyncNode) { - syncNodeEventLog(pSyncNode, "sync close"); if (pSyncNode == NULL) { return; } int32_t ret; + syncNodeEventLog(pSyncNode, "sync close"); + ret = raftStoreClose(pSyncNode->pRaftStore); ASSERT(ret == 0); @@ -1879,6 +1882,10 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { } inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { + if (pSyncNode == NULL) { + return; + } + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); @@ -1954,6 +1961,10 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { } inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { + if (pSyncNode == NULL) { + return; + } + int32_t userStrLen = strlen(str); SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; @@ -2937,6 +2948,7 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL"); } + syncEntryDestory(pEntry); taosMemoryFree(serialized); syncClientRequestDestroy(pSyncMsg); @@ -3003,13 +3015,14 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { syncPingReply2RpcMsg(pMsgReply, &rpcMsg); /* - // htonl - SMsgHead* pHead = rpcMsg.pCont; - pHead->contLen = htonl(pHead->contLen); - pHead->vgId = htonl(pHead->vgId); - */ + // htonl + SMsgHead* pHead = rpcMsg.pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); +*/ syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + syncPingReplyDestroy(pMsgReply); return ret; } @@ -3058,6 +3071,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { // reply syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + syncHeartbeatReplyDestroy(pMsgReply); return 0; } @@ -3329,17 +3343,23 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde return 0; } - // advance commit index to sanpshot first - SSnapshot snapshot = {0}; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); - if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) { - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, - snapshot.lastApplyIndex); - syncNodeEventLog(ths, eventLog); + if (ths == NULL) { + return -1; + } - // update begin index - beginIndex = snapshot.lastApplyIndex + 1; + if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) { + // advance commit index to sanpshot first + SSnapshot snapshot = {0}; + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); + if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) { + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, + snapshot.lastApplyIndex); + syncNodeEventLog(ths, eventLog); + + // update begin index + beginIndex = snapshot.lastApplyIndex + 1; + } } int32_t code = 0; @@ -3413,8 +3433,10 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde // config change finish if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE_FINISH) { - code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry); - ASSERT(code == 0); + if (rpcMsg.pCont != NULL) { + code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry); + ASSERT(code == 0); + } } #if 0 @@ -3528,7 +3550,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { for (int i = 0; i < pSyncNode->peersNum; ++i) { SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]); - if (pSender->start) { + if (pSender != NULL && pSender->start) { sError("sync cannot change3"); return false; } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 9de3fde389..4001a955fb 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -411,32 +411,40 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { pMsg->bytes = bytes; if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU32(&decoder, &pMsg->msgType) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) { + taosMemoryFree(pMsg); return NULL; } uint32_t len; char* data = NULL; if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { + taosMemoryFree(pMsg); return NULL; } - ASSERT(len = pMsg->dataLen); + ASSERT(len == pMsg->dataLen); memcpy(pMsg->data, data, len); tEndDecode(&decoder); @@ -673,32 +681,40 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) { pMsg->bytes = bytes; if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU32(&decoder, &pMsg->msgType) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) { + taosMemoryFree(pMsg); return NULL; } uint32_t len; char* data = NULL; if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { + taosMemoryFree(pMsg); return NULL; } - ASSERT(len = pMsg->dataLen); + ASSERT(len == pMsg->dataLen); memcpy(pMsg->data, data, len); tEndDecode(&decoder); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 940aaca055..6d372acf2f 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -532,7 +532,7 @@ int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry* pEntry = NULL; int32_t code = raftEntryCacheGetEntryP(pCache, index, &pEntry); if (code == 1) { - *ppEntry = taosMemoryMalloc(pEntry->bytes); + *ppEntry = taosMemoryMalloc((int64_t)(pEntry->bytes)); memcpy(*ppEntry, pEntry, pEntry->bytes); (*ppEntry)->rid = -1; } else { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 6f234631da..f152201901 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -209,7 +209,8 @@ bool syncUtilCanPrint(char c) { } char* syncUtilprintBin(char* ptr, uint32_t len) { - char* s = taosMemoryMalloc(len + 1); + int64_t memLen = (int64_t)(len + 1); + char* s = taosMemoryMalloc(memLen); ASSERT(s != NULL); memset(s, 0, len + 1); memcpy(s, ptr, len); From 88b7ad91dc7fc1ddd1ea29fd985a2e0865435952 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Tue, 25 Oct 2022 14:58:26 +0800 Subject: [PATCH 10/12] fix(all): improve coverage rate --- source/util/src/tfunctional.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/util/src/tfunctional.c b/source/util/src/tfunctional.c index 3b51d0046f..d8f1e33324 100644 --- a/source/util/src/tfunctional.c +++ b/source/util/src/tfunctional.c @@ -16,6 +16,9 @@ #define _DEFAULT_SOURCE #include "tfunctional.h" +FORCE_INLINE void* genericInvoke(tGenericSavedFunc* const pSavedFunc) { return pSavedFunc->func(pSavedFunc->args); } + +#if 0 tGenericSavedFunc* genericSavedFuncInit(GenericVaFunc func, int32_t numOfArgs) { tGenericSavedFunc* pSavedFunc = taosMemoryMalloc(sizeof(tGenericSavedFunc) + numOfArgs * (sizeof(void*))); if (pSavedFunc == NULL) return NULL; @@ -37,10 +40,9 @@ tVoidSavedFunc* voidSavedFuncInit(VoidVaFunc func, int32_t numOfArgs) { return pSavedFunc; } -FORCE_INLINE void* genericInvoke(tGenericSavedFunc* const pSavedFunc) { return pSavedFunc->func(pSavedFunc->args); } - FORCE_INLINE int32_t i32Invoke(tI32SavedFunc* const pSavedFunc) { return pSavedFunc->func(pSavedFunc->args); } FORCE_INLINE void voidInvoke(tVoidSavedFunc* const pSavedFunc) { if (pSavedFunc) pSavedFunc->func(pSavedFunc->args); } +#endif \ No newline at end of file From a87f915ebfbe411bba450973104bb36a2f5b97f4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 25 Oct 2022 15:30:41 +0800 Subject: [PATCH 11/12] fix(query):set correct column match info size. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 11 ++++++++++- source/libs/executor/src/cachescanoperator.c | 2 +- source/libs/executor/src/executil.c | 2 +- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ca877f599f..f94dbc51d1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1631,7 +1631,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) { return code; } } @@ -3768,6 +3768,15 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { return false; } +bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) { + void* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); + if (pBlockScanInfo == NULL) { // no data block for the table of given uid + return false; + } + + +} + static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { ASSERT(pDataBlockInfo != NULL && pReader != NULL); pDataBlockInfo->rows = pReader->pResBlock->info.rows; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 22a685230a..914da422ad 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -279,7 +279,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC } size_t size = taosArrayGetSize(pColMatchInfo->pList); - SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchInfo)); + SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem)); for (int32_t i = 0; i < size; ++i) { SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index f58c2a0e34..971b28eb09 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1073,7 +1073,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod pMatchInfo->matchType = type; - SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo)); + SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchItem)); if (pList == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; return code; From 6acdfdd9abdf2d112d7906b45d95bd3be151a5b0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 25 Oct 2022 16:20:51 +0800 Subject: [PATCH 12/12] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index f94dbc51d1..71828882c3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -111,7 +111,7 @@ typedef struct SDataBlockIter { int32_t index; SArray* blockList; // SArray int32_t order; - SDataBlk block; // current SDataBlk data + SDataBlk block; // current SDataBlk data SHashObj* pTableMap; } SDataBlockIter; @@ -1209,19 +1209,19 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer); } -static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, +static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order) { bool asc = ASCENDING_TRAVERSE(order); - if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) { + if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) { return NULL; } - if (!asc && pFBlockInfo->tbBlockIdx == 0) { + if (!asc && pBlockInfo->tbBlockIdx == 0) { return NULL; } int32_t step = asc ? 1 : -1; - *nextIndex = pFBlockInfo->tbBlockIdx + step; + *nextIndex = pBlockInfo->tbBlockIdx + step; SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk)); int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); @@ -3769,12 +3769,12 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { } bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) { - void* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); + STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); if (pBlockScanInfo == NULL) { // no data block for the table of given uid return false; } - + return true; } static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {