remove assert

This commit is contained in:
yihaoDeng 2022-08-31 15:36:53 +08:00
parent d0669151af
commit cac4cb12e6
3 changed files with 40 additions and 28 deletions

View File

@ -39,7 +39,8 @@ static void idxGenLRUKey(char* buf, const char* path, int32_t blockId) {
} }
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) { static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
if (ctx->type == TFILE) { if (ctx->type == TFILE) {
assert(len == taosWriteFile(ctx->file.pFile, buf, len)); int nwr = taosWriteFile(ctx->file.pFile, buf, len);
assert(nwr == len);
} else { } else {
memcpy(ctx->mem.buf + ctx->offset, buf, len); memcpy(ctx->mem.buf + ctx->offset, buf, len);
} }

View File

@ -21,6 +21,7 @@
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
// clang-format on
#define HTTP_RECV_BUF_SIZE 1024 #define HTTP_RECV_BUF_SIZE 1024
@ -29,7 +30,7 @@ typedef struct SHttpClient {
uv_tcp_t tcp; uv_tcp_t tcp;
uv_write_t req; uv_write_t req;
uv_buf_t* wbuf; uv_buf_t* wbuf;
char *rbuf; char* rbuf;
char* addr; char* addr;
uint16_t port; uint16_t port;
} SHttpClient; } SHttpClient;
@ -130,37 +131,36 @@ static void destroyHttpClient(SHttpClient* cli) {
taosMemoryFree(cli->rbuf); taosMemoryFree(cli->rbuf);
taosMemoryFree(cli->addr); taosMemoryFree(cli->addr);
taosMemoryFree(cli); taosMemoryFree(cli);
} }
static void clientCloseCb(uv_handle_t* handle) { static void clientCloseCb(uv_handle_t* handle) {
SHttpClient* cli = handle->data; SHttpClient* cli = handle->data;
destroyHttpClient(cli); destroyHttpClient(cli);
} }
static void clientAllocBuffCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { static void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SHttpClient* cli = handle->data; SHttpClient* cli = handle->data;
buf->base = cli->rbuf; buf->base = cli->rbuf;
buf->len = HTTP_RECV_BUF_SIZE; buf->len = HTTP_RECV_BUF_SIZE;
} }
static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t *buf) { static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
SHttpClient* cli = handle->data; SHttpClient* cli = handle->data;
if (nread < 0) { if (nread < 0) {
uError("http-report recv error:%s", uv_err_name(nread)); uError("http-report recv error:%s", uv_err_name(nread));
} else { } else {
uTrace("http-report succ to recv %d bytes, just ignore it", nread); uTrace("http-report succ to recv %d bytes, just ignore it", nread);
} }
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} }
static void clientSentCb(uv_write_t* req, int32_t status) { static void clientSentCb(uv_write_t* req, int32_t status) {
SHttpClient* cli = req->data; SHttpClient* cli = req->data;
if (status != 0) { if (status != 0) {
terrno = TAOS_SYSTEM_ERROR(status); terrno = TAOS_SYSTEM_ERROR(status);
uError("http-report failed to send data %s", uv_strerror(status)); uError("http-report failed to send data %s", uv_strerror(status));
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
return; return;
} else { } else {
uTrace("http-report succ to send data"); uTrace("http-report succ to send data");
} }
uv_read_start((uv_stream_t *)&cli->tcp, clientAllocBuffCb, clientRecvCb); uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
} }
static void clientConnCb(uv_connect_t* req, int32_t status) { static void clientConnCb(uv_connect_t* req, int32_t status) {
SHttpClient* cli = req->data; SHttpClient* cli = req->data;
@ -212,7 +212,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
cli->tcp.data = cli; cli->tcp.data = cli;
cli->req.data = cli; cli->req.data = cli;
cli->wbuf = wb; cli->wbuf = wb;
cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE);
cli->addr = tstrdup(server); cli->addr = tstrdup(server);
cli->port = port; cli->port = port;
@ -233,4 +233,3 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
uv_loop_close(loop); uv_loop_close(loop);
return terrno; return terrno;
} }
// clang-format on

View File

@ -906,23 +906,30 @@ static void uvDestroyConn(uv_handle_t* handle) {
} }
} }
static void uvPipeListenCb(uv_stream_t* handle, int status) { static void uvPipeListenCb(uv_stream_t* handle, int status) {
ASSERT(status == 0); if (status != 0) {
tError("server failed to init pipe");
return;
}
SServerObj* srv = container_of(handle, SServerObj, pipeListen); SServerObj* srv = container_of(handle, SServerObj, pipeListen);
uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]); uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1));
ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe));
ASSERT(1 == uv_is_readable((uv_stream_t*)pipe)); int ret = uv_pipe_init(srv->loop, pipe, 1);
ASSERT(1 == uv_is_writable((uv_stream_t*)pipe)); assert(ret == 0);
ASSERT(0 == uv_is_closing((uv_handle_t*)pipe));
ret = uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe);
assert(ret == 0);
ret = uv_is_readable((uv_stream_t*)pipe);
assert(ret == 1);
ret = uv_is_writable((uv_stream_t*)pipe);
assert(ret == 1);
ret = uv_is_closing((uv_handle_t*)pipe);
assert(ret == 0);
srv->numOfWorkerReady++; srv->numOfWorkerReady++;
// ASSERT(0 == uv_listen((uv_stream_t*)&ctx.send.tcp, 512, uvOnAcceptCb));
// r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, read_cb);
// ASSERT(r == 0);
} }
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
@ -937,7 +944,9 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->port = port; srv->port = port;
uv_loop_init(srv->loop); uv_loop_init(srv->loop);
assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0)); int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0);
assert(ret == 0);
#ifdef WINDOWS #ifdef WINDOWS
char pipeName[64]; char pipeName[64];
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-" PRIu64, taosSafeRand(), GetCurrentProcessId()); snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-" PRIu64, taosSafeRand(), GetCurrentProcessId());
@ -946,8 +955,11 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(),
taosGetSelfPthreadId()); taosGetSelfPthreadId());
#endif #endif
assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName)); ret = uv_pipe_bind(&srv->pipeListen, pipeName);
assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb)); assert(ret == 0);
ret = uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb);
assert(ret == 0);
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd)); SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));