Merge remote-tracking branch 'origin/feature/3.0_liaohj' into feature/3.0_liaohj

This commit is contained in:
Haojun Liao 2022-01-21 11:19:49 +08:00
commit 07b62707cc
5 changed files with 223 additions and 30 deletions

View File

@ -82,7 +82,9 @@ class FstReadMemory {
bool init() { bool init() {
char* buf = (char*)calloc(1, sizeof(char) * _size); char* buf = (char*)calloc(1, sizeof(char) * _size);
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size); int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
if (nRead <= 0) { return false; } if (nRead <= 0) {
return false;
}
_size = nRead; _size = nRead;
_s = fstSliceCreate((uint8_t*)buf, _size); _s = fstSliceCreate((uint8_t*)buf, _size);
_fst = fstCreate(&_s); _fst = fstCreate(&_s);
@ -108,7 +110,9 @@ class FstReadMemory {
StreamWithState* st = streamBuilderIntoStream(sb); StreamWithState* st = streamBuilderIntoStream(sb);
StreamWithStateResult* rt = NULL; StreamWithStateResult* rt = NULL;
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { result.push_back((uint64_t)(rt->out.out)); } while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
result.push_back((uint64_t)(rt->out.out));
}
return true; return true;
} }
bool SearchWithTimeCostUs(AutomationCtx* ctx, std::vector<uint64_t>& result) { bool SearchWithTimeCostUs(AutomationCtx* ctx, std::vector<uint64_t>& result) {
@ -184,7 +188,9 @@ void checkFstPerf() {
delete fw; delete fw;
FstReadMemory* m = new FstReadMemory(1024 * 64); FstReadMemory* m = new FstReadMemory(1024 * 64);
if (m->init()) { printf("success to init fst read"); } if (m->init()) {
printf("success to init fst read");
}
Performance_fstReadRecords(m); Performance_fstReadRecords(m);
delete m; delete m;
} }
@ -348,7 +354,9 @@ class TFileObj {
tfileReaderDestroy(reader_); tfileReaderDestroy(reader_);
reader_ = NULL; reader_ = NULL;
} }
if (writer_ == NULL) { InitWriter(); } if (writer_ == NULL) {
InitWriter();
}
return tfileWriterPut(writer_, tv, false); return tfileWriterPut(writer_, tv, false);
} }
bool InitWriter() { bool InitWriter() {
@ -388,8 +396,12 @@ class TFileObj {
return tfileReaderSearch(reader_, query, result); return tfileReaderSearch(reader_, query, result);
} }
~TFileObj() { ~TFileObj() {
if (writer_) { tfileWriterDestroy(writer_); } if (writer_) {
if (reader_) { tfileReaderDestroy(reader_); } tfileWriterDestroy(writer_);
}
if (reader_) {
tfileReaderDestroy(reader_);
}
} }
private: private:
@ -912,7 +924,8 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
} }
TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) { TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
std::string path = "/tmp/cache_and_tfile"; std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {} if (index->Init(path) != 0) {
}
std::thread threads[NUM_OF_THREAD]; std::thread threads[NUM_OF_THREAD];
for (int i = 0; i < NUM_OF_THREAD; i++) { for (int i = 0; i < NUM_OF_THREAD; i++) {
@ -927,14 +940,24 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
TEST_F(IndexEnv2, testIndex_restart) { TEST_F(IndexEnv2, testIndex_restart) {
std::string path = "/tmp/cache_and_tfile"; std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {} if (index->Init(path) != 0) {
}
index->SearchOneTarget("tag1", "Hello", 10);
index->SearchOneTarget("tag2", "Test", 10);
}
TEST_F(IndexEnv2, testIndex_restart1) {
std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {
}
index->ReadMultiMillonData("tag1", "coding");
index->SearchOneTarget("tag1", "Hello", 10); index->SearchOneTarget("tag1", "Hello", 10);
index->SearchOneTarget("tag2", "Test", 10); index->SearchOneTarget("tag2", "Test", 10);
} }
TEST_F(IndexEnv2, testIndex_read_performance) { TEST_F(IndexEnv2, testIndex_read_performance) {
std::string path = "/tmp/cache_and_tfile"; std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {} if (index->Init(path) != 0) {
}
index->PutOneTarge("tag1", "Hello", 12); index->PutOneTarge("tag1", "Hello", 12);
index->PutOneTarge("tag1", "Hello", 15); index->PutOneTarge("tag1", "Hello", 15);
index->ReadMultiMillonData("tag1", "Hello"); index->ReadMultiMillonData("tag1", "Hello");
@ -943,17 +966,84 @@ TEST_F(IndexEnv2, testIndex_read_performance) {
} }
TEST_F(IndexEnv2, testIndexMultiTag) { TEST_F(IndexEnv2, testIndexMultiTag) {
std::string path = "/tmp/multi_tag"; std::string path = "/tmp/multi_tag";
if (index->Init(path) != 0) {} if (index->Init(path) != 0) {
}
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
int32_t num = 1000 * 10000; int32_t num = 1000 * 10000;
index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num); index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num);
std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl; std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl;
// index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000); // index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
} }
TEST_F(IndexEnv2, testLongComVal) { TEST_F(IndexEnv2, testLongComVal1) {
std::string path = "/tmp/long_colVal"; std::string path = "/tmp/long_colVal";
if (index->Init(path) != 0) {} if (index->Init(path) != 0) {
}
// gen colVal by randstr // gen colVal by randstr
std::string randstr = "xxxxxxxxxxxxxxxxx"; std::string randstr = "xxxxxxxxxxxxxxxxx";
index->WriteMultiMillonData("tag1", randstr, 100 * 10000); index->WriteMultiMillonData("tag1", randstr, 100 * 10000);
} }
TEST_F(IndexEnv2, testLongComVal2) {
std::string path = "/tmp/long_colVal";
if (index->Init(path) != 0) {
}
// gen colVal by randstr
std::string randstr = "abcccc fdadfafdafda";
index->WriteMultiMillonData("tag1", randstr, 100 * 10000);
}
TEST_F(IndexEnv2, testLongComVal3) {
std::string path = "/tmp/long_colVal";
if (index->Init(path) != 0) {
}
// gen colVal by randstr
std::string randstr = "Yes, coding and coding and coding";
index->WriteMultiMillonData("tag1", randstr, 100 * 10000);
}
TEST_F(IndexEnv2, testLongComVal4) {
std::string path = "/tmp/long_colVal";
if (index->Init(path) != 0) {
}
// gen colVal by randstr
std::string randstr = "111111 bac fdadfa";
index->WriteMultiMillonData("tag1", randstr, 100 * 10000);
}
TEST_F(IndexEnv2, testIndex_read_performance1) {
std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {
}
index->PutOneTarge("tag1", "Hello", 12);
index->PutOneTarge("tag1", "Hello", 15);
index->ReadMultiMillonData("tag1", "Hello", 1000);
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
assert(3 == index->SearchOne("tag1", "Hello"));
}
TEST_F(IndexEnv2, testIndex_read_performance2) {
std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {
}
index->PutOneTarge("tag1", "Hello", 12);
index->PutOneTarge("tag1", "Hello", 15);
index->ReadMultiMillonData("tag1", "Hello", 1000 * 10);
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
assert(3 == index->SearchOne("tag1", "Hello"));
}
TEST_F(IndexEnv2, testIndex_read_performance3) {
std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {
}
index->PutOneTarge("tag1", "Hello", 12);
index->PutOneTarge("tag1", "Hello", 15);
index->ReadMultiMillonData("tag1", "Hello", 1000 * 100);
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
assert(3 == index->SearchOne("tag1", "Hello"));
}
TEST_F(IndexEnv2, testIndex_read_performance4) {
std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {
}
index->PutOneTarge("tag10", "Hello", 12);
index->PutOneTarge("tag12", "Hello", 15);
index->ReadMultiMillonData("tag10", "Hello", 1000 * 100);
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
assert(3 == index->SearchOne("tag10", "Hello"));
}

View File

@ -70,6 +70,7 @@ int32_t rpcInit(void) {
void rpcCleanup(void) { void rpcCleanup(void) {
// impl later // impl later
//
return; return;
} }
#endif #endif

View File

@ -26,6 +26,7 @@ typedef struct SCliConn {
queue conn; queue conn;
char spi; char spi;
char secured; char secured;
uint64_t expireTime;
} SCliConn; } SCliConn;
typedef struct SCliMsg { typedef struct SCliMsg {
@ -39,10 +40,13 @@ typedef struct SCliThrdObj {
pthread_t thread; pthread_t thread;
uv_loop_t* loop; uv_loop_t* loop;
uv_async_t* cliAsync; // uv_async_t* cliAsync; //
uv_timer_t* pTimer;
void* cache; // conn pool void* cache; // conn pool
queue msg; queue msg;
pthread_mutex_t msgMtx; pthread_mutex_t msgMtx;
void* shandle; uint64_t nextTimeout; // next timeout
void* shandle; //
} SCliThrdObj; } SCliThrdObj;
typedef struct SClientObj { typedef struct SClientObj {
@ -52,10 +56,19 @@ typedef struct SClientObj {
SCliThrdObj** pThreadObj; SCliThrdObj** pThreadObj;
} SClientObj; } SClientObj;
typedef struct SConnList {
queue conn;
} SConnList;
// conn pool // conn pool
// add expire timeout and capacity limit
static void* connCacheCreate(int size);
static void* connCacheDestroy(void* cache);
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port);
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn);
// register timer in each thread to clear expire conn
static void clientTimeoutCb(uv_timer_t* handle);
// process data read from server, auth/decompress etc // process data read from server, auth/decompress etc
static void clientProcessData(SCliConn* conn); static void clientProcessData(SCliConn* conn);
// check whether already read complete packet from server // check whether already read complete packet from server
@ -77,10 +90,93 @@ static void clientMsgDestroy(SCliMsg* pMsg);
static void* clientThread(void* arg); static void* clientThread(void* arg);
static void clientProcessData(SCliConn* conn) { static void clientProcessData(SCliConn* conn) {
STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx;
SRpcInfo* pRpc = pCtx->ahandle;
SRpcMsg rpcMsg;
rpcMsg.pCont = conn->readBuf.buf;
rpcMsg.contLen = conn->readBuf.len;
rpcMsg.ahandle = pCtx->ahandle;
(pRpc->cfp)(NULL, &rpcMsg, NULL);
// impl // impl
} }
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data;
SRpcInfo* pRpc = pThrd->shandle;
int64_t currentTime = pThrd->nextTimeout;
SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL);
while (p != NULL) {
while (!QUEUE_IS_EMPTY(&p->conn)) {
queue* h = QUEUE_HEAD(&p->conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
if (c->expireTime < currentTime) {
QUEUE_REMOVE(h);
clientConnDestroy(c);
} else {
break;
}
}
p = taosHashIterate((SHashObj*)pThrd->cache, p);
}
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
uv_timer_start(handle, clientTimeoutCb, pRpc->idleTime * 10, 0);
}
static void* connCacheCreate(int size) {
SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
return false;
}
static void* connCacheDestroy(void* cache) {
SConnList* connList = taosHashIterate((SHashObj*)cache, NULL);
while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conn)) {
queue* h = QUEUE_HEAD(&connList->conn);
QUEUE_REMOVE(h);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
clientConnDestroy(c);
}
connList = taosHashIterate((SHashObj*)cache, connList);
}
taosHashClear(cache);
}
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
char key[128] = {0};
tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
SHashObj* pCache = cache;
SConnList* plist = taosHashGet(pCache, key, strlen(key));
if (plist == NULL) {
SConnList list;
plist = &list;
QUEUE_INIT(&plist->conn);
taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist));
}
if (QUEUE_IS_EMPTY(&plist->conn)) {
return NULL;
}
queue* h = QUEUE_HEAD(&plist->conn);
QUEUE_REMOVE(h);
return QUEUE_DATA(h, SCliConn, conn);
}
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) {
char key[128] = {0};
tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
STransConnCtx* ctx = ((SCliMsg*)conn->data)->ctx;
SRpcInfo* pRpc = ctx->pRpc;
conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key));
// list already create before
assert(plist != NULL);
QUEUE_PUSH(&plist->conn, &conn->conn);
}
static bool clientReadComplete(SConnBuffer* data) { static bool clientReadComplete(SConnBuffer* data) {
STransMsgHead head; STransMsgHead head;
int32_t headLen = sizeof(head); int32_t headLen = sizeof(head);
@ -152,6 +248,7 @@ static void clientConnDestroy(SCliConn* conn) {
} }
static void clientDestroy(uv_handle_t* handle) { static void clientDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
QUEUE_REMOVE(&conn->conn);
clientConnDestroy(conn); clientConnDestroy(conn);
} }
@ -206,15 +303,6 @@ static void clientConnCb(uv_connect_t* req, int status) {
clientWrite(pConn); clientWrite(pConn);
} }
static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
// impl later
return NULL;
}
static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) {
// impl later
}
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs(); uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st; uint64_t el = et - pMsg->st;
@ -234,6 +322,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->writeReq = malloc(sizeof(uv_write_t)); conn->writeReq = malloc(sizeof(uv_write_t));
QUEUE_INIT(&conn->conn);
conn->connReq.data = conn; conn->connReq.data = conn;
conn->data = pMsg; conn->data = pMsg;
@ -270,6 +359,9 @@ static void clientAsyncCb(uv_async_t* handle) {
static void* clientThread(void* arg) { static void* clientThread(void* arg) {
SCliThrdObj* pThrd = (SCliThrdObj*)arg; SCliThrdObj* pThrd = (SCliThrdObj*)arg;
SRpcInfo* pRpc = pThrd->shandle;
pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10;
uv_timer_start(pThrd->pTimer, clientTimeoutCb, pRpc->idleTime * 10, 0);
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
} }
@ -291,7 +383,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb);
pThrd->cliAsync->data = pThrd; pThrd->cliAsync->data = pThrd;
pThrd->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->pTimer);
pThrd->shandle = shandle; pThrd->shandle = shandle;
int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
if (err == 0) { if (err == 0) {
tDebug("sucess to create tranport-client thread %d", i); tDebug("sucess to create tranport-client thread %d", i);

View File

@ -210,8 +210,8 @@ static int uvAuthMsg(SConn* pConn, char* msg, int len) {
// refers specifically to query or insert timeout // refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) { static void uvHandleActivityTimeout(uv_timer_t* handle) {
// impl later
SConn* conn = handle->data; SConn* conn = handle->data;
tDebug("%p timeout since no activity", conn);
} }
static void uvProcessData(SConn* pConn) { static void uvProcessData(SConn* pConn) {
@ -232,12 +232,13 @@ static void uvProcessData(SConn* pConn) {
SRpcInfo* pRpc = (SRpcInfo*)p->shandle; SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
// auth here // auth here
// auth should not do in rpc thread
int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen); // int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
if (code != 0) { // if (code != 0) {
terrno = code; // terrno = code;
return; // return;
} //}
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
int32_t dlen = 0; int32_t dlen = 0;
@ -248,7 +249,7 @@ static void uvProcessData(SConn* pConn) {
} else { } else {
// impl later // impl later
} }
rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen); rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = pHead->content; rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code; rpcMsg.code = pHead->code;
@ -318,6 +319,9 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
return; return;
} }
uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len);
uv_timer_stop(conn->pTimer);
uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
} }
} }

View File

@ -40,6 +40,7 @@ static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (pEpSet) pInfo->epSet = *pEpSet; if (pEpSet) pInfo->epSet = *pEpSet;
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
// tsem_post(&pInfo->rspSem);
tsem_post(&pInfo->rspSem); tsem_post(&pInfo->rspSem);
} }
@ -60,6 +61,7 @@ static void *sendRequest(void *param) {
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
// tsem_wait(&pInfo->rspSem);
tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem);
} }