Merge pull request #11235 from taosdata/feature/indexUpdate

Feature/index update
This commit is contained in:
Yihao Deng 2022-04-03 15:39:40 +08:00 committed by GitHub
commit 46d21aac0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 22 deletions

View File

@ -193,6 +193,7 @@ static void uvHandleReq(SSrvConn* pConn) {
transMsg.ahandle = (void*)pHead->ahandle; transMsg.ahandle = (void*)pHead->ahandle;
transMsg.handle = NULL; transMsg.handle = NULL;
// transDestroyBuffer(&pConn->readBuf);
transClearBuffer(&pConn->readBuf); transClearBuffer(&pConn->readBuf);
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
@ -249,6 +250,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
conn->broken = true; conn->broken = true;
if (conn->status == ConnAcquire) { if (conn->status == ConnAcquire) {
if (conn->regArg.init) { if (conn->regArg.init) {
tTrace("server conn %p broken, notify server app", conn);
STrans* pTransInst = conn->pTransInst; STrans* pTransInst = conn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg)); memset(&conn->regArg, 0, sizeof(conn->regArg));
@ -270,7 +272,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
void uvOnSendCb(uv_write_t* req, int status) { void uvOnSendCb(uv_write_t* req, int status) {
SSrvConn* conn = req->data; SSrvConn* conn = req->data;
transClearBuffer(&conn->readBuf); // transClearBuffer(&conn->readBuf);
if (status == 0) { if (status == 0) {
tTrace("server conn %p data already was written on stream", conn); tTrace("server conn %p data already was written on stream", conn);
if (!transQueueEmpty(&conn->srvMsgs)) { if (!transQueueEmpty(&conn->srvMsgs)) {

View File

@ -35,8 +35,8 @@ typedef struct {
} SInfo; } SInfo;
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->ahandle; SInfo *pInfo = (SInfo *)pMsg->ahandle;
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, // tError("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
pMsg->code); // pMsg->code);
if (pEpSet) pInfo->epSet = *pEpSet; if (pEpSet) pInfo->epSet = *pEpSet;
@ -51,9 +51,9 @@ static void *sendRequest(void *param) {
SInfo * pInfo = (SInfo *)param; SInfo * pInfo = (SInfo *)param;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
tDebug("thread:%d, start to send request", pInfo->index); tError("thread:%d, start to send request", pInfo->index);
tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs); tError("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs);
int u100 = 0; int u100 = 0;
int u500 = 0; int u500 = 0;
int u1000 = 0; int u1000 = 0;
@ -68,7 +68,7 @@ static void *sendRequest(void *param) {
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
int64_t start = taosGetTimestampUs(); int64_t start = taosGetTimestampUs();
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); if (pInfo->num % 20000 == 0) tError("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
// tsem_wait(&pInfo->rspSem); // tsem_wait(&pInfo->rspSem);
tsem_wait(&pInfo->rspSem); tsem_wait(&pInfo->rspSem);
int64_t end = taosGetTimestampUs() - start; int64_t end = taosGetTimestampUs() - start;
@ -88,7 +88,7 @@ static void *sendRequest(void *param) {
} }
tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000); tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000);
tDebug("thread:%d, it is over", pInfo->index); tError("thread:%d, it is over", pInfo->index);
tcount++; tcount++;
return NULL; return NULL;
@ -124,7 +124,7 @@ int main(int argc, char *argv[]) {
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.spi = 1; rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcDebugFlag = 143; rpcDebugFlag = 131;
for (int i = 1; i < argc; ++i) { for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
@ -170,6 +170,10 @@ int main(int argc, char *argv[]) {
} }
} }
const char *path = "/tmp/transport/client";
taosRemoveDir(path);
taosMkDir(path);
tstrncpy(tsLogDir, path, PATH_MAX);
taosInitLog("client.log", 10); taosInitLog("client.log", 10);
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
@ -178,8 +182,8 @@ int main(int argc, char *argv[]) {
return -1; return -1;
} }
tInfo("client is initialized"); tError("client is initialized");
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs); tError("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
taosGetTimeOfDay(&systemTime); taosGetTimeOfDay(&systemTime);
startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec; startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
@ -208,8 +212,9 @@ int main(int argc, char *argv[]) {
endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec; endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
float usedTime = (endTime - startTime) / 1000.0f; // mseconds float usedTime = (endTime - startTime) / 1000.0f; // mseconds
tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads); tError("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize); tError("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime,
msgSize);
int ch = getchar(); int ch = getchar();
UNUSED(ch); UNUSED(ch);

View File

@ -125,7 +125,7 @@ int main(int argc, char *argv[]) {
rpcInit.idleTime = 2 * 1500; rpcInit.idleTime = 2 * 1500;
rpcInit.afp = retrieveAuthInfo; rpcInit.afp = retrieveAuthInfo;
rpcDebugFlag = 143; rpcDebugFlag = 131;
for (int i = 1; i < argc; ++i) { for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
@ -160,6 +160,11 @@ int main(int argc, char *argv[]) {
tsAsyncLog = 0; tsAsyncLog = 0;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
const char *path = "/tmp/transport/server";
taosRemoveDir(path);
taosMkDir(path);
tstrncpy(tsLogDir, path, PATH_MAX);
taosInitLog("server.log", 10); taosInitLog("server.log", 10);
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);