diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 0727d32674..43e80b99d4 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1062,7 +1062,7 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) if (tdbTbcMoveTo(pCursor->pCur, &ctimeKey, sizeof(ctimeKey), &cmp) < 0) { goto END; } - bool first = true; + int32_t valid = 0; while (1) { void *entryKey = NULL; @@ -1074,7 +1074,13 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type); if (cmp == 0) taosArrayPush(pUids, &p->uid); - if (cmp == -1) break; + + if (param->reverse == false) { + if (cmp == -1) break; + } else if (param->reverse) { + if (cmp == 1) break; + } + valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur); if (valid < 0) break; } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 9974e22572..1196b512d3 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -572,8 +572,12 @@ static int metaBuildCtimeIdxKey(SCtimeIdxKey *ctimeKey, const SMetaEntry *pME) { } static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) { - ncolKey->ncol = pME->ntbEntry.schemaRow.nCols; - ncolKey->uid = pME->uid; + if (pME->type == TSDB_NORMAL_TABLE) { + ncolKey->ncol = pME->ntbEntry.schemaRow.nCols; + ncolKey->uid = pME->uid; + } else { + return -1; + } return 0; } @@ -777,9 +781,13 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; goto _err; } - // search the column to add/drop/update pSchema = &entry.ntbEntry.schemaRow; + + // save old entry + SMetaEntry oldEntry = {.type = TSDB_NORMAL_TABLE, .uid = entry.uid}; + oldEntry.ntbEntry.schemaRow.nCols = pSchema->nCols; + int32_t iCol = 0; for (;;) { pColumn = NULL; @@ -872,6 +880,9 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl entry.version = version; + metaDeleteNcolIdx(pMeta, &oldEntry); + metaUpdateNcolIdx(pMeta, &entry); + // do actual write metaWLock(pMeta); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ca877f599f..71828882c3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -111,7 +111,7 @@ typedef struct SDataBlockIter { int32_t index; SArray* blockList; // SArray int32_t order; - SDataBlk block; // current SDataBlk data + SDataBlk block; // current SDataBlk data SHashObj* pTableMap; } SDataBlockIter; @@ -1209,19 +1209,19 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer); } -static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, +static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order) { bool asc = ASCENDING_TRAVERSE(order); - if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) { + if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) { return NULL; } - if (!asc && pFBlockInfo->tbBlockIdx == 0) { + if (!asc && pBlockInfo->tbBlockIdx == 0) { return NULL; } int32_t step = asc ? 1 : -1; - *nextIndex = pFBlockInfo->tbBlockIdx + step; + *nextIndex = pBlockInfo->tbBlockIdx + step; SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk)); int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); @@ -1631,7 +1631,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) { return code; } } @@ -3768,6 +3768,15 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { return false; } +bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) { + STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); + if (pBlockScanInfo == NULL) { // no data block for the table of given uid + return false; + } + + return true; +} + static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { ASSERT(pDataBlockInfo != NULL && pReader != NULL); pDataBlockInfo->rows = pReader->pResBlock->info.rows; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 22a685230a..914da422ad 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -279,7 +279,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC } size_t size = taosArrayGetSize(pColMatchInfo->pList); - SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchInfo)); + SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem)); for (int32_t i = 0; i < size; ++i) { SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index f58c2a0e34..971b28eb09 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1073,7 +1073,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod pMatchInfo->matchType = type; - SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo)); + SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchItem)); if (pList == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; return code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9773a0c8e7..7bbfccd461 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2881,7 +2881,7 @@ int optSysDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) { default: return -1; } - return 1; + return cmp; } static int optSysFilterFuncImpl__LowerThan(void* a, void* b, int16_t dtype) { @@ -2987,10 +2987,6 @@ static int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) { .val = pVal->datum.p, .reverse = reverse, .filterFunc = func}; - - int32_t ret = metaFilterCreateTime(pMeta, ¶m, result); - if (ret == 0) return 0; - return -1; } @@ -3002,15 +2998,17 @@ static int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) { bool reverse = false; __optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse); - SMetaFltParam param = {.suid = 0, - .cid = 0, - .type = TSDB_DATA_TYPE_BIGINT, - .val = &pVal->datum.i, - .reverse = reverse, - .filterFunc = func}; - int32_t ret = metaFilterCreateTime(pMeta, ¶m, result); if (func == NULL) return -1; - return 0; + + SMetaFltParam param = {.suid = 0, + .cid = 0, + .type = TSDB_DATA_TYPE_BIGINT, + .val = &pVal->datum.i, + .reverse = reverse, + .filterFunc = func}; + + int32_t ret = metaFilterCreateTime(pMeta, ¶m, result); + return ret; } static int32_t sysFilte__Ncolumn(void* arg, SNode* pNode, SArray* result) { void* pMeta = ((SSTabFltArg*)arg)->pMeta; @@ -3073,7 +3071,7 @@ static int32_t sysChkFilter__Comm(SNode* pNode) { SOperatorNode* pOper = (SOperatorNode*)pNode; EOperatorType opType = pOper->opType; if (opType != OP_TYPE_EQUAL && opType != OP_TYPE_LOWER_EQUAL && opType != OP_TYPE_LOWER_THAN && - OP_TYPE_GREATER_EQUAL && opType != OP_TYPE_GREATER_THAN) { + opType != OP_TYPE_GREATER_EQUAL && opType != OP_TYPE_GREATER_THAN) { return -1; } return 0; diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 811a7b8e99..3aeb2d30b5 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -45,6 +45,11 @@ // /\ UNCHANGED <> // void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { + if (pSyncNode == NULL) { + sError("pSyncNode is NULL"); + return; + } + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index"); return; @@ -172,6 +177,7 @@ static inline int64_t syncNodeAbs64(int64_t a, int64_t b) { int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { return pSyncNode->quorum; +#if 0 int32_t quorum = 1; // self int64_t timeNow = taosGetTimestampMs(); @@ -228,6 +234,7 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { } return quorum; +#endif } /* diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1f24c7c403..09584077f1 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -835,7 +835,9 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { sInfo("vgId:%d, sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); } - pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps; + if (pEpSet->numOfEps > 0) { + pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps; + } sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -1438,12 +1440,13 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { } void syncNodeClose(SSyncNode* pSyncNode) { - syncNodeEventLog(pSyncNode, "sync close"); if (pSyncNode == NULL) { return; } int32_t ret; + syncNodeEventLog(pSyncNode, "sync close"); + ret = raftStoreClose(pSyncNode->pRaftStore); ASSERT(ret == 0); @@ -1879,6 +1882,10 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { } inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { + if (pSyncNode == NULL) { + return; + } + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); @@ -1954,6 +1961,10 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { } inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { + if (pSyncNode == NULL) { + return; + } + int32_t userStrLen = strlen(str); SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; @@ -2937,6 +2948,7 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL"); } + syncEntryDestory(pEntry); taosMemoryFree(serialized); syncClientRequestDestroy(pSyncMsg); @@ -3003,13 +3015,14 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { syncPingReply2RpcMsg(pMsgReply, &rpcMsg); /* - // htonl - SMsgHead* pHead = rpcMsg.pCont; - pHead->contLen = htonl(pHead->contLen); - pHead->vgId = htonl(pHead->vgId); - */ + // htonl + SMsgHead* pHead = rpcMsg.pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); +*/ syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + syncPingReplyDestroy(pMsgReply); return ret; } @@ -3058,6 +3071,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { // reply syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + syncHeartbeatReplyDestroy(pMsgReply); return 0; } @@ -3329,17 +3343,23 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde return 0; } - // advance commit index to sanpshot first - SSnapshot snapshot = {0}; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); - if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) { - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, - snapshot.lastApplyIndex); - syncNodeEventLog(ths, eventLog); + if (ths == NULL) { + return -1; + } - // update begin index - beginIndex = snapshot.lastApplyIndex + 1; + if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) { + // advance commit index to sanpshot first + SSnapshot snapshot = {0}; + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); + if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) { + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, + snapshot.lastApplyIndex); + syncNodeEventLog(ths, eventLog); + + // update begin index + beginIndex = snapshot.lastApplyIndex + 1; + } } int32_t code = 0; @@ -3413,8 +3433,10 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde // config change finish if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE_FINISH) { - code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry); - ASSERT(code == 0); + if (rpcMsg.pCont != NULL) { + code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry); + ASSERT(code == 0); + } } #if 0 @@ -3528,7 +3550,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { for (int i = 0; i < pSyncNode->peersNum; ++i) { SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]); - if (pSender->start) { + if (pSender != NULL && pSender->start) { sError("sync cannot change3"); return false; } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 9de3fde389..4001a955fb 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -411,32 +411,40 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { pMsg->bytes = bytes; if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU32(&decoder, &pMsg->msgType) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) { + taosMemoryFree(pMsg); return NULL; } uint32_t len; char* data = NULL; if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { + taosMemoryFree(pMsg); return NULL; } - ASSERT(len = pMsg->dataLen); + ASSERT(len == pMsg->dataLen); memcpy(pMsg->data, data, len); tEndDecode(&decoder); @@ -673,32 +681,40 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) { pMsg->bytes = bytes; if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU32(&decoder, &pMsg->msgType) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) { + taosMemoryFree(pMsg); return NULL; } uint32_t len; char* data = NULL; if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { + taosMemoryFree(pMsg); return NULL; } - ASSERT(len = pMsg->dataLen); + ASSERT(len == pMsg->dataLen); memcpy(pMsg->data, data, len); tEndDecode(&decoder); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 940aaca055..6d372acf2f 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -532,7 +532,7 @@ int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry* pEntry = NULL; int32_t code = raftEntryCacheGetEntryP(pCache, index, &pEntry); if (code == 1) { - *ppEntry = taosMemoryMalloc(pEntry->bytes); + *ppEntry = taosMemoryMalloc((int64_t)(pEntry->bytes)); memcpy(*ppEntry, pEntry, pEntry->bytes); (*ppEntry)->rid = -1; } else { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 6f234631da..f152201901 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -209,7 +209,8 @@ bool syncUtilCanPrint(char c) { } char* syncUtilprintBin(char* ptr, uint32_t len) { - char* s = taosMemoryMalloc(len + 1); + int64_t memLen = (int64_t)(len + 1); + char* s = taosMemoryMalloc(memLen); ASSERT(s != NULL); memset(s, 0, len + 1); memcpy(s, ptr, len); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index cc381d06a7..6819068b64 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -374,7 +374,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { } else { tError("fail to dispatch conn to work thread"); } - uv_close((uv_handle_t*)req->data, uvFreeCb); + if (!uv_is_closing((uv_handle_t*)req->data)) { + uv_close((uv_handle_t*)req->data, uvFreeCb); + } else { + taosMemoryFree(req->data); + } taosMemoryFree(req); } @@ -651,12 +655,14 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_tcp_init(pObj->loop, cli); if (uv_accept(stream, (uv_stream_t*)cli) == 0) { +#ifdef WINDOWS if (pObj->numOfWorkerReady < pObj->numOfThreads) { tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady); uv_close((uv_handle_t*)cli, NULL); return; } +#endif uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); wr->data = cli; @@ -668,7 +674,11 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb); } else { - uv_close((uv_handle_t*)cli, NULL); + if (!uv_is_closing((uv_handle_t*)cli)) { + uv_close((uv_handle_t*)cli, NULL); + } else { + taosMemoryFree(cli); + } } } void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { @@ -681,7 +691,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { tWarn("failed to create connect:%p", q); taosMemoryFree(buf->base); uv_close((uv_handle_t*)q, NULL); - // taosMemoryFree(q); return; } // free memory allocated by @@ -770,7 +779,12 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { return false; } +#ifdef WINDOWS uv_pipe_init(pThrd->loop, pThrd->pipe, 1); +#else + uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + uv_pipe_open(pThrd->pipe, pThrd->fd); +#endif pThrd->pipe->data = pThrd; @@ -785,8 +799,11 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { QUEUE_INIT(&pThrd->conn); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); +#ifdef WINDOWS uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); - // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); +#else + uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); +#endif return true; } @@ -958,20 +975,19 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->port = port; uv_loop_init(srv->loop); + char pipeName[PATH_MAX]; +#ifdef WINDOWS int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0); if (ret != 0) { tError("failed to init pipe, errmsg: %s", uv_err_name(ret)); goto End; } -#ifdef WINDOWS - char pipeName[64]; snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId()); -#else - char pipeName[PATH_MAX] = {0}; - snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), - taosGetSelfPthreadId()); -#endif + // char pipeName[PATH_MAX] = {0}; + // snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), + // taosGetSelfPthreadId()); + ret = uv_pipe_bind(&srv->pipeListen, pipeName); if (ret != 0) { tError("failed to bind pipe, errmsg: %s", uv_err_name(ret)); @@ -997,6 +1013,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, if (false == addHandleToWorkloop(thrd, pipeName)) { goto End; } + int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); if (err == 0) { tDebug("success to create worker-thread:%d", i); @@ -1006,14 +1023,54 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; } } +#else + + for (int i = 0; i < srv->numOfThreads; i++) { + SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd)); + + thrd->pTransInst = shandle; + thrd->quit = false; + thrd->pTransInst = shandle; + + srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); + srv->pThreadObj[i] = thrd; + + uv_os_sock_t fds[2]; + if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + goto End; + } + + uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); + uv_pipe_open(&(srv->pipe[i][0]), fds[1]); + + thrd->pipe = &(srv->pipe[i][1]); // init read + thrd->fd = fds[0]; + + if (false == addHandleToWorkloop(thrd, pipeName)) { + goto End; + } + + int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); + if (err == 0) { + tDebug("success to create worker-thread:%d", i); + } else { + // TODO: clear all other resource later + tError("failed to create worker-thread:%d", i); + goto End; + } + } + +#endif if (false == taosValidIpAndPort(srv->ip, srv->port)) { terrno = TAOS_SYSTEM_ERROR(errno); tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr()); goto End; } + if (false == addHandleToAcceptloop(srv)) { goto End; } + int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv); if (err == 0) { tDebug("success to create accept-thread"); @@ -1022,6 +1079,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; // clear all resource later } + srv->inited = true; return srv; End: diff --git a/source/util/src/tfunctional.c b/source/util/src/tfunctional.c index 3b51d0046f..d8f1e33324 100644 --- a/source/util/src/tfunctional.c +++ b/source/util/src/tfunctional.c @@ -16,6 +16,9 @@ #define _DEFAULT_SOURCE #include "tfunctional.h" +FORCE_INLINE void* genericInvoke(tGenericSavedFunc* const pSavedFunc) { return pSavedFunc->func(pSavedFunc->args); } + +#if 0 tGenericSavedFunc* genericSavedFuncInit(GenericVaFunc func, int32_t numOfArgs) { tGenericSavedFunc* pSavedFunc = taosMemoryMalloc(sizeof(tGenericSavedFunc) + numOfArgs * (sizeof(void*))); if (pSavedFunc == NULL) return NULL; @@ -37,10 +40,9 @@ tVoidSavedFunc* voidSavedFuncInit(VoidVaFunc func, int32_t numOfArgs) { return pSavedFunc; } -FORCE_INLINE void* genericInvoke(tGenericSavedFunc* const pSavedFunc) { return pSavedFunc->func(pSavedFunc->args); } - FORCE_INLINE int32_t i32Invoke(tI32SavedFunc* const pSavedFunc) { return pSavedFunc->func(pSavedFunc->args); } FORCE_INLINE void voidInvoke(tVoidSavedFunc* const pSavedFunc) { if (pSavedFunc) pSavedFunc->func(pSavedFunc->args); } +#endif \ No newline at end of file