diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 4f3330b7b3..bbcc654ae2 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -82,7 +82,9 @@ class FstReadMemory { bool init() { char* buf = (char*)calloc(1, sizeof(char) * _size); int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size); - if (nRead <= 0) { return false; } + if (nRead <= 0) { + return false; + } _size = nRead; _s = fstSliceCreate((uint8_t*)buf, _size); _fst = fstCreate(&_s); @@ -108,7 +110,9 @@ class FstReadMemory { StreamWithState* st = streamBuilderIntoStream(sb); StreamWithStateResult* rt = NULL; - while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { result.push_back((uint64_t)(rt->out.out)); } + while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { + result.push_back((uint64_t)(rt->out.out)); + } return true; } bool SearchWithTimeCostUs(AutomationCtx* ctx, std::vector& result) { @@ -184,7 +188,9 @@ void checkFstPerf() { delete fw; FstReadMemory* m = new FstReadMemory(1024 * 64); - if (m->init()) { printf("success to init fst read"); } + if (m->init()) { + printf("success to init fst read"); + } Performance_fstReadRecords(m); delete m; } @@ -348,7 +354,9 @@ class TFileObj { tfileReaderDestroy(reader_); reader_ = NULL; } - if (writer_ == NULL) { InitWriter(); } + if (writer_ == NULL) { + InitWriter(); + } return tfileWriterPut(writer_, tv, false); } bool InitWriter() { @@ -388,8 +396,12 @@ class TFileObj { return tfileReaderSearch(reader_, query, result); } ~TFileObj() { - if (writer_) { tfileWriterDestroy(writer_); } - if (reader_) { tfileReaderDestroy(reader_); } + if (writer_) { + tfileWriterDestroy(writer_); + } + if (reader_) { + tfileReaderDestroy(reader_); + } } private: @@ -912,7 +924,8 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { } TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) { std::string path = "/tmp/cache_and_tfile"; - if (index->Init(path) != 0) {} + if (index->Init(path) != 0) { + } std::thread threads[NUM_OF_THREAD]; for (int i = 0; i < NUM_OF_THREAD; i++) { @@ -927,14 +940,24 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) { TEST_F(IndexEnv2, testIndex_restart) { std::string path = "/tmp/cache_and_tfile"; - if (index->Init(path) != 0) {} + if (index->Init(path) != 0) { + } + index->SearchOneTarget("tag1", "Hello", 10); + index->SearchOneTarget("tag2", "Test", 10); +} +TEST_F(IndexEnv2, testIndex_restart1) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + } + index->ReadMultiMillonData("tag1", "coding"); index->SearchOneTarget("tag1", "Hello", 10); index->SearchOneTarget("tag2", "Test", 10); } TEST_F(IndexEnv2, testIndex_read_performance) { std::string path = "/tmp/cache_and_tfile"; - if (index->Init(path) != 0) {} + if (index->Init(path) != 0) { + } index->PutOneTarge("tag1", "Hello", 12); index->PutOneTarge("tag1", "Hello", 15); index->ReadMultiMillonData("tag1", "Hello"); @@ -943,17 +966,84 @@ TEST_F(IndexEnv2, testIndex_read_performance) { } TEST_F(IndexEnv2, testIndexMultiTag) { std::string path = "/tmp/multi_tag"; - if (index->Init(path) != 0) {} + if (index->Init(path) != 0) { + } int64_t st = taosGetTimestampUs(); int32_t num = 1000 * 10000; index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num); std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl; // index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000); } -TEST_F(IndexEnv2, testLongComVal) { +TEST_F(IndexEnv2, testLongComVal1) { std::string path = "/tmp/long_colVal"; - if (index->Init(path) != 0) {} + if (index->Init(path) != 0) { + } // gen colVal by randstr std::string randstr = "xxxxxxxxxxxxxxxxx"; index->WriteMultiMillonData("tag1", randstr, 100 * 10000); } + +TEST_F(IndexEnv2, testLongComVal2) { + std::string path = "/tmp/long_colVal"; + if (index->Init(path) != 0) { + } + // gen colVal by randstr + std::string randstr = "abcccc fdadfafdafda"; + index->WriteMultiMillonData("tag1", randstr, 100 * 10000); +} +TEST_F(IndexEnv2, testLongComVal3) { + std::string path = "/tmp/long_colVal"; + if (index->Init(path) != 0) { + } + // gen colVal by randstr + std::string randstr = "Yes, coding and coding and coding"; + index->WriteMultiMillonData("tag1", randstr, 100 * 10000); +} +TEST_F(IndexEnv2, testLongComVal4) { + std::string path = "/tmp/long_colVal"; + if (index->Init(path) != 0) { + } + // gen colVal by randstr + std::string randstr = "111111 bac fdadfa"; + index->WriteMultiMillonData("tag1", randstr, 100 * 10000); +} +TEST_F(IndexEnv2, testIndex_read_performance1) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + } + index->PutOneTarge("tag1", "Hello", 12); + index->PutOneTarge("tag1", "Hello", 15); + index->ReadMultiMillonData("tag1", "Hello", 1000); + std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; + assert(3 == index->SearchOne("tag1", "Hello")); +} +TEST_F(IndexEnv2, testIndex_read_performance2) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + } + index->PutOneTarge("tag1", "Hello", 12); + index->PutOneTarge("tag1", "Hello", 15); + index->ReadMultiMillonData("tag1", "Hello", 1000 * 10); + std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; + assert(3 == index->SearchOne("tag1", "Hello")); +} +TEST_F(IndexEnv2, testIndex_read_performance3) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + } + index->PutOneTarge("tag1", "Hello", 12); + index->PutOneTarge("tag1", "Hello", 15); + index->ReadMultiMillonData("tag1", "Hello", 1000 * 100); + std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; + assert(3 == index->SearchOne("tag1", "Hello")); +} +TEST_F(IndexEnv2, testIndex_read_performance4) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) { + } + index->PutOneTarge("tag10", "Hello", 12); + index->PutOneTarge("tag12", "Hello", 15); + index->ReadMultiMillonData("tag10", "Hello", 1000 * 100); + std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; + assert(3 == index->SearchOne("tag10", "Hello")); +} diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index cf1e153965..4b490936cc 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -70,6 +70,7 @@ int32_t rpcInit(void) { void rpcCleanup(void) { // impl later + // return; } #endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 86e9c05ccb..f2d844f73d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -26,6 +26,7 @@ typedef struct SCliConn { queue conn; char spi; char secured; + uint64_t expireTime; } SCliConn; typedef struct SCliMsg { @@ -39,10 +40,13 @@ typedef struct SCliThrdObj { pthread_t thread; uv_loop_t* loop; uv_async_t* cliAsync; // - void* cache; // conn pool + uv_timer_t* pTimer; + void* cache; // conn pool queue msg; pthread_mutex_t msgMtx; - void* shandle; + uint64_t nextTimeout; // next timeout + void* shandle; // + } SCliThrdObj; typedef struct SClientObj { @@ -52,10 +56,19 @@ typedef struct SClientObj { SCliThrdObj** pThreadObj; } SClientObj; +typedef struct SConnList { + queue conn; +} SConnList; + // conn pool +// add expire timeout and capacity limit +static void* connCacheCreate(int size); +static void* connCacheDestroy(void* cache); static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); +// register timer in each thread to clear expire conn +static void clientTimeoutCb(uv_timer_t* handle); // process data read from server, auth/decompress etc static void clientProcessData(SCliConn* conn); // check whether already read complete packet from server @@ -77,10 +90,93 @@ static void clientMsgDestroy(SCliMsg* pMsg); static void* clientThread(void* arg); static void clientProcessData(SCliConn* conn) { + STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; + SRpcInfo* pRpc = pCtx->ahandle; + SRpcMsg rpcMsg; + + rpcMsg.pCont = conn->readBuf.buf; + rpcMsg.contLen = conn->readBuf.len; + rpcMsg.ahandle = pCtx->ahandle; + (pRpc->cfp)(NULL, &rpcMsg, NULL); // impl } static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void clientTimeoutCb(uv_timer_t* handle) { + SCliThrdObj* pThrd = handle->data; + SRpcInfo* pRpc = pThrd->shandle; + int64_t currentTime = pThrd->nextTimeout; + + SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL); + while (p != NULL) { + while (!QUEUE_IS_EMPTY(&p->conn)) { + queue* h = QUEUE_HEAD(&p->conn); + SCliConn* c = QUEUE_DATA(h, SCliConn, conn); + if (c->expireTime < currentTime) { + QUEUE_REMOVE(h); + clientConnDestroy(c); + } else { + break; + } + } + p = taosHashIterate((SHashObj*)pThrd->cache, p); + } + + pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + uv_timer_start(handle, clientTimeoutCb, pRpc->idleTime * 10, 0); +} +static void* connCacheCreate(int size) { + SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + return false; +} +static void* connCacheDestroy(void* cache) { + SConnList* connList = taosHashIterate((SHashObj*)cache, NULL); + while (connList != NULL) { + while (!QUEUE_IS_EMPTY(&connList->conn)) { + queue* h = QUEUE_HEAD(&connList->conn); + QUEUE_REMOVE(h); + SCliConn* c = QUEUE_DATA(h, SCliConn, conn); + clientConnDestroy(c); + } + connList = taosHashIterate((SHashObj*)cache, connList); + } + taosHashClear(cache); +} + +static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { + char key[128] = {0}; + tstrncpy(key, ip, strlen(ip)); + tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); + + SHashObj* pCache = cache; + SConnList* plist = taosHashGet(pCache, key, strlen(key)); + if (plist == NULL) { + SConnList list; + plist = &list; + QUEUE_INIT(&plist->conn); + taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist)); + } + + if (QUEUE_IS_EMPTY(&plist->conn)) { + return NULL; + } + queue* h = QUEUE_HEAD(&plist->conn); + QUEUE_REMOVE(h); + return QUEUE_DATA(h, SCliConn, conn); +} +static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) { + char key[128] = {0}; + tstrncpy(key, ip, strlen(ip)); + tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); + + STransConnCtx* ctx = ((SCliMsg*)conn->data)->ctx; + SRpcInfo* pRpc = ctx->pRpc; + conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key)); + // list already create before + assert(plist != NULL); + QUEUE_PUSH(&plist->conn, &conn->conn); +} static bool clientReadComplete(SConnBuffer* data) { STransMsgHead head; int32_t headLen = sizeof(head); @@ -152,6 +248,7 @@ static void clientConnDestroy(SCliConn* conn) { } static void clientDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; + QUEUE_REMOVE(&conn->conn); clientConnDestroy(conn); } @@ -206,15 +303,6 @@ static void clientConnCb(uv_connect_t* req, int status) { clientWrite(pConn); } -static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { - // impl later - - return NULL; -} -static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) { - // impl later -} - static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; @@ -234,6 +322,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->writeReq = malloc(sizeof(uv_write_t)); + QUEUE_INIT(&conn->conn); conn->connReq.data = conn; conn->data = pMsg; @@ -270,6 +359,9 @@ static void clientAsyncCb(uv_async_t* handle) { static void* clientThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; + SRpcInfo* pRpc = pThrd->shandle; + pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + uv_timer_start(pThrd->pTimer, clientTimeoutCb, pRpc->idleTime * 10, 0); uv_run(pThrd->loop, UV_RUN_DEFAULT); } @@ -291,7 +383,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); pThrd->cliAsync->data = pThrd; + pThrd->pTimer = malloc(sizeof(uv_timer_t)); + uv_timer_init(pThrd->loop, pThrd->pTimer); + pThrd->shandle = shandle; + int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); if (err == 0) { tDebug("sucess to create tranport-client thread %d", i); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index d096ab7813..4542541043 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -210,8 +210,8 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) { // refers specifically to query or insert timeout static void uvHandleActivityTimeout(uv_timer_t* handle) { - // impl later SConn* conn = handle->data; + tDebug("%p timeout since no activity", conn); } static void uvProcessData(SConn* pConn) { @@ -232,12 +232,13 @@ static void uvProcessData(SConn* pConn) { SRpcInfo* pRpc = (SRpcInfo*)p->shandle; // auth here + // auth should not do in rpc thread - int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen); - if (code != 0) { - terrno = code; - return; - } + // int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen); + // if (code != 0) { + // terrno = code; + // return; + //} pHead->code = htonl(pHead->code); int32_t dlen = 0; @@ -248,7 +249,7 @@ static void uvProcessData(SConn* pConn) { } else { // impl later } - rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen); + rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; rpcMsg.code = pHead->code; @@ -318,6 +319,9 @@ void uvWorkerAsyncCb(uv_async_t* handle) { return; } uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); + + uv_timer_stop(conn->pTimer); + uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } } diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 6339e58560..4ccbb60cc2 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -40,6 +40,7 @@ static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { if (pEpSet) pInfo->epSet = *pEpSet; rpcFreeCont(pMsg->pCont); + // tsem_post(&pInfo->rspSem); tsem_post(&pInfo->rspSem); } @@ -60,6 +61,7 @@ static void *sendRequest(void *param) { // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); + // tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem); }