fix: batch write
This commit is contained in:
parent
27643f7c21
commit
a6a8daec23
|
@ -115,7 +115,8 @@ typedef struct SRpcInit {
|
|||
int32_t connLimitNum;
|
||||
int32_t connLimitLock;
|
||||
|
||||
void *parent;
|
||||
int8_t supportBatch; // 0: no batch, 1. batch
|
||||
void *parent;
|
||||
} SRpcInit;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -290,6 +290,7 @@ int32_t dmInitClient(SDnode *pDnode) {
|
|||
|
||||
rpcInit.connLimitNum = connLimitNum;
|
||||
rpcInit.connLimitLock = 1;
|
||||
rpcInit.supportBatch = 1;
|
||||
|
||||
pTrans->clientRpc = rpcOpen(&rpcInit);
|
||||
if (pTrans->clientRpc == NULL) {
|
||||
|
|
|
@ -66,6 +66,7 @@ typedef struct {
|
|||
|
||||
int32_t connLimitNum;
|
||||
int8_t connLimitLock; // 0: no lock. 1. lock
|
||||
int8_t supportBatch; // 0: no batch, 1: support batch
|
||||
|
||||
int index;
|
||||
void* parent;
|
||||
|
|
|
@ -69,6 +69,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
pRpc->failFastFp = pInit->ffp;
|
||||
pRpc->connLimitNum = pInit->connLimitNum;
|
||||
pRpc->connLimitLock = pInit->connLimitLock;
|
||||
pRpc->supportBatch = pInit->supportBatch;
|
||||
|
||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||
if (pRpc->numOfThreads <= 0) {
|
||||
|
|
|
@ -40,6 +40,8 @@ typedef struct SCliConn {
|
|||
bool broken; // link broken or not
|
||||
ConnStatus status; //
|
||||
|
||||
SCliBatch* pBatch;
|
||||
|
||||
int64_t refId;
|
||||
char* ip;
|
||||
|
||||
|
@ -62,6 +64,16 @@ typedef struct SCliMsg {
|
|||
int sent; //(0: no send, 1: alread sent)
|
||||
} SCliMsg;
|
||||
|
||||
typedef struct {
|
||||
queue wq;
|
||||
int32_t wLen;
|
||||
int32_t batchSize; //
|
||||
int32_t batch;
|
||||
char* dst;
|
||||
char* ip;
|
||||
uint16_t port;
|
||||
} SCliBatch;
|
||||
|
||||
typedef struct SCliThrd {
|
||||
TdThread thread; // tid
|
||||
int64_t pid; // pid
|
||||
|
@ -86,6 +98,7 @@ typedef struct SCliThrd {
|
|||
|
||||
SHashObj* failFastCache;
|
||||
SHashObj* connLimitCache;
|
||||
SHashObj* batchCache;
|
||||
|
||||
SCliMsg* stopMsg;
|
||||
|
||||
|
@ -132,6 +145,11 @@ static void cliAsyncCb(uv_async_t* handle);
|
|||
static void cliIdleCb(uv_idle_t* handle);
|
||||
static void cliPrepareCb(uv_prepare_t* handle);
|
||||
|
||||
static void cliSendBatch(const SCliBatch* pBatch, SCliThrd* pThrd);
|
||||
static void cliSendBatchCb(uv_write_t* req, int status);
|
||||
// callback after conn to server
|
||||
static void cliConnBatchCb(uv_connect_t* req, int status);
|
||||
|
||||
static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
|
||||
|
||||
static int32_t allocConnRef(SCliConn* conn, bool update);
|
||||
|
@ -167,6 +185,8 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd);
|
|||
static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
|
||||
static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL,
|
||||
cliHandleUpdate};
|
||||
/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
|
||||
/// NULL,cliHandleUpdate};
|
||||
|
||||
static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
|
||||
static FORCE_INLINE void destroyCmsg(void* cmsg);
|
||||
|
@ -287,6 +307,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
|||
}
|
||||
destroyCmsg(msg);
|
||||
}
|
||||
transQueueClear(&conn->cliMsgs);
|
||||
memset(&conn->ctx, 0, sizeof(conn->ctx));
|
||||
}
|
||||
bool cliMaySendCachedMsg(SCliConn* conn) {
|
||||
|
@ -888,6 +909,169 @@ void cliSend(SCliConn* pConn) {
|
|||
_RETURN:
|
||||
return;
|
||||
}
|
||||
|
||||
static SCliBatch* cliDumpBatch(SCliBatch* pBatch) {
|
||||
SCliBatch* pNewBatch = taosMemCalloc(1, sizeof(SClicBatch));
|
||||
pNewBatch->wq = pBatch->wq;
|
||||
|
||||
pNewBatch->batchSize = pBatch->batchSize;
|
||||
pNewBatch->batch = pBatch->batch;
|
||||
pNewBatch->wLen = pBatch->wLen;
|
||||
pNewBatch->dst = strdup(pBatch->dst);
|
||||
pNewBatch->ip = strdup(pBatch->ip);
|
||||
pNewBatch->port = pBatch->port;
|
||||
|
||||
QUEUE_INIT(&pBatch->wq);
|
||||
pBatch->batchSize = 0;
|
||||
pBatch->batch = 0;
|
||||
pBatch->wLen = 0;
|
||||
|
||||
return pNewBatch;
|
||||
}
|
||||
static void cliDestroyBatch(SCliBatch* pBatch) {
|
||||
while (!EMPTY_IS_EMPTY(&pBatch->wq)) {
|
||||
queue* h = QUEUE_HEAD(&pBatch->wq);
|
||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||
|
||||
QUEUE_REMOVE(&pMsg->q);
|
||||
destroyCmsg(p);
|
||||
}
|
||||
taosMemoryFree(pBatch->ip);
|
||||
taosMemoryFree(pBatch->dst);
|
||||
taosMemoryFree(pBatch);
|
||||
}
|
||||
static void cliSendBatch(SCliBatch* pBatch, SCliThrd* pThrd) {
|
||||
if (pBatch->wLen == 0 || EMPTY_IS_EMPTY(&pBatch->wq)) {
|
||||
return;
|
||||
}
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
SCliBatch* pNewBatch = cliDumpBatch(pBatch);
|
||||
|
||||
SCliConn* conn = getConnFromPool(pThrd->pool, pBatch->ip, pBatch->port);
|
||||
if (conn == NULL) {
|
||||
conn = cliCreateConn(pThrd);
|
||||
conn->pBatch = pNewBatch;
|
||||
conn->ip = strdup(conn->pBatch->ip);
|
||||
|
||||
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip);
|
||||
if (ipaddr == 0xffffffff) {
|
||||
uv_timer_stop(conn->timer);
|
||||
conn->timer->data = NULL;
|
||||
taosArrayPush(pThrd->timerList, &conn->timer);
|
||||
conn->timer = NULL;
|
||||
|
||||
cliHandleExcept(conn);
|
||||
return;
|
||||
}
|
||||
struct sockaddr_in addr;
|
||||
addr.sin_family = AF_INET;
|
||||
addr.sin_addr.s_addr = ipaddr;
|
||||
addr.sin_port = (uint16_t)htons(port);
|
||||
|
||||
tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip);
|
||||
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4);
|
||||
if (fd == -1) {
|
||||
tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn,
|
||||
tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
cliHandleExcept(conn);
|
||||
errno = 0;
|
||||
return;
|
||||
}
|
||||
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
|
||||
if (ret != 0) {
|
||||
tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
|
||||
cliHandleExcept(conn);
|
||||
return;
|
||||
}
|
||||
ret = transSetConnOption((uv_tcp_t*)conn->stream);
|
||||
if (ret != 0) {
|
||||
tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret));
|
||||
cliHandleExcept(conn);
|
||||
return;
|
||||
}
|
||||
|
||||
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||
if (ret != 0) {
|
||||
uv_timer_stop(conn->timer);
|
||||
conn->timer->data = NULL;
|
||||
taosArrayPush(pThrd->timerList, &conn->timer);
|
||||
conn->timer = NULL;
|
||||
|
||||
cliHandleFastFail(conn, ret);
|
||||
return;
|
||||
}
|
||||
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
conn->pBatch = pNewBatch;
|
||||
|
||||
int32_t wLen = pBatch->wLen;
|
||||
uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
|
||||
int i = 0;
|
||||
|
||||
while (!EMPTY_IS_EMPTY(&pBatch->wq)) {
|
||||
queue* h = QUEUE_HEAD(&pBatch->wq);
|
||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||
QUEUE_REMOVE(&pMsg->q);
|
||||
|
||||
transQueuePush(conn->cliMsgs, pMsg);
|
||||
|
||||
STransConnCtx* pCtx = pCliMsg->ctx;
|
||||
|
||||
STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg);
|
||||
if (pMsg->pCont == 0) {
|
||||
pMsg->pCont = (void*)rpcMallocCont(0);
|
||||
pMsg->contLen = 0;
|
||||
}
|
||||
|
||||
int msgLen = transMsgLenFromCont(pMsg->contLen);
|
||||
STransMsgHead* pHead = transHeadFromCont(pMsg->pCont);
|
||||
|
||||
if (pHead->comp == 0) {
|
||||
pHead->ahandle = pCtx != NULL ? (uint64_t)pCtx->ahandle : 0;
|
||||
pHead->noResp = REQUEST_NO_RESP(pMsg) ? 1 : 0;
|
||||
pHead->persist = REQUEST_PERSIS_HANDLE(pMsg) ? 1 : 0;
|
||||
pHead->msgType = pMsg->msgType;
|
||||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
||||
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
|
||||
pHead->traceId = pMsg->info.traceId;
|
||||
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
|
||||
}
|
||||
pHead->timestamp = taosHton64(taosGetTimestampUs());
|
||||
|
||||
if (pHead->comp == 0) {
|
||||
if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) {
|
||||
msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead);
|
||||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||
}
|
||||
} else {
|
||||
msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
|
||||
}
|
||||
|
||||
wb[i++] = uv_buf_init((char*)pHead, msgLen);
|
||||
}
|
||||
|
||||
pBatch->wLen = 0;
|
||||
uv_write_t* req = taosMemCalloc(1, sizeof(uv_write_t));
|
||||
req->data = pConn;
|
||||
uv_write(req, (uv_stream_t*)conn->stream, wb, wLen, cliSendBatchCb);
|
||||
taosMemoryFree(wb);
|
||||
}
|
||||
static void cliSendBatchCb(uv_write_t* req, int status) {
|
||||
SCliConn* conn = req->data;
|
||||
SCliThrd* thrd = conn->hostThrd;
|
||||
cliDestroyBatch(conn->pBatch);
|
||||
conn->pBatch = NULL;
|
||||
|
||||
if (status != 0) {
|
||||
cliHandleExcept(conn);
|
||||
} else {
|
||||
addConnToPool(thrd->pool, conn);
|
||||
}
|
||||
}
|
||||
static void cliHandleFastFail(SCliConn* pConn, int status) {
|
||||
SCliThrd* pThrd = pConn->hostThrd;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
@ -1218,29 +1402,93 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
}
|
||||
tGTrace("%s conn %p ready", pTransInst->label, conn);
|
||||
}
|
||||
static void cliAsyncCb(uv_async_t* handle) {
|
||||
SAsyncItem* item = handle->data;
|
||||
SCliThrd* pThrd = item->pThrd;
|
||||
SCliMsg* pMsg = NULL;
|
||||
|
||||
// batch process to avoid to lock/unlock frequently
|
||||
queue wq;
|
||||
taosThreadMutexLock(&item->mtx);
|
||||
QUEUE_MOVE(&item->qmsg, &wq);
|
||||
taosThreadMutexUnlock(&item->mtx);
|
||||
|
||||
static void cliNoBatchDealReq(queue wq, SCliThrd* pThrd) {
|
||||
int count = 0;
|
||||
|
||||
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||
queue* h = QUEUE_HEAD(&wq);
|
||||
QUEUE_REMOVE(h);
|
||||
|
||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
|
||||
|
||||
count++;
|
||||
}
|
||||
if (count >= 2) {
|
||||
tTrace("cli process batch size:%d", count);
|
||||
}
|
||||
}
|
||||
|
||||
static void cliHandleBatch() static void cliBatchDealReq(queue wq, SCliThrd* pThrd) {
|
||||
int count = 0;
|
||||
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||
queue* h = QUEUE_HEAD(&wq);
|
||||
QUEUE_REMOVE(h);
|
||||
|
||||
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
|
||||
if (REQUEST_NO_RESP(&pMsg->msg)) {
|
||||
STransConnCtx* pCtx = pMsg->ctx;
|
||||
|
||||
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
|
||||
uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||
|
||||
SCliBatch *ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key)));
|
||||
if (*ppBatch == NULL) {
|
||||
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
|
||||
QUEUE_INIT(&pBatch->wq);
|
||||
QUEUE_PUSH(&pBatch->wq, h);
|
||||
pBatch->wLen += 1;
|
||||
pBatch->batchSize += pMsg->msg.contLen;
|
||||
|
||||
pBatch->dst = strdup(key);
|
||||
pBatch->ip = strdup(ip);
|
||||
pBatch->port = (uint16_t)port;
|
||||
|
||||
taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatch, sizeof(void*));
|
||||
} else {
|
||||
QUEUE_PUSH(&(*ppBatch)->wq, h);
|
||||
(*pBatch)->wLen += 1;
|
||||
(*pBatch)->batchSize += pMsg->msg.contLen;
|
||||
}
|
||||
}
|
||||
(*cliAsyncHandle[pMsg->type])(pMsg, pThrd);
|
||||
count++;
|
||||
}
|
||||
|
||||
void** pIter = taoskHashIterate(pThrd->batchCache, NULL);
|
||||
while (pIter != NULL) {
|
||||
SCliBatch* batch = (SCliBatch*)(*pIter);
|
||||
|
||||
cliSendBatch(batch, pThrd);
|
||||
pIter = (void**)taosHashIterate(info, pIter);
|
||||
}
|
||||
|
||||
if (count >= 2) {
|
||||
tTrace("cli process batch size:%d", count);
|
||||
}
|
||||
}
|
||||
|
||||
static void cliAsyncCb(uv_async_t* handle) {
|
||||
SAsyncItem* item = handle->data;
|
||||
SCliThrd* pThrd = item->pThrd;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
|
||||
SCliMsg* pMsg = NULL;
|
||||
// batch process to avoid to lock/unlock frequently
|
||||
queue wq;
|
||||
taosThreadMutexLock(&item->mtx);
|
||||
QUEUE_MOVE(&item->qmsg, &wq);
|
||||
taosThreadMutexUnlock(&item->mtx);
|
||||
|
||||
int8_t supportBatch = pTransInst->supprtBatch;
|
||||
if (supportBatch == 0) {
|
||||
cliNotBatchDealReq(wq, pThrd);
|
||||
} else if (supportBatch == 1) {
|
||||
cliBatchDealReq(wq, pThrd);
|
||||
}
|
||||
|
||||
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
|
||||
}
|
||||
|
@ -1456,6 +1704,8 @@ static SCliThrd* createThrdObj(void* trans) {
|
|||
pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
|
||||
pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK);
|
||||
|
||||
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, hash_no_lock);
|
||||
|
||||
pThrd->quit = false;
|
||||
return pThrd;
|
||||
}
|
||||
|
@ -1484,6 +1734,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
|
|||
taosHashCleanup(pThrd->fqdn2ipCache);
|
||||
taosHashCleanup(pThrd->failFastCache);
|
||||
taosHashCleanup(pThrd->connLimitCache);
|
||||
taosHashCleanup(pThrd->batchCache);
|
||||
taosMemoryFree(pThrd);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue