From 00370ddccf84273ccd99df6df56d2d8f208ab292 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 23 Dec 2022 12:41:39 +0800 Subject: [PATCH 1/4] handle except on taosd-client --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 20dbc2114d..f6ff2b16b2 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -111,6 +111,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType)); terrno = TSDB_CODE_INVALID_MSG_LEN; goto _OVER; + } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) && + (!IsReq(pRpc)) && (pRpc->pCont == NULL)) { + dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code)); + terrno = pRpc->code; + goto _OVER; } if (pHandle->defaultNtype == NODE_END) { From 71bbc4fdd059f6bc6b718dbe1559fd78dd06b187 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 23 Dec 2022 22:21:28 +0800 Subject: [PATCH 2/4] update charset conv --- source/libs/index/inc/indexFstFile.h | 4 + source/libs/index/src/indexFstFile.c | 63 +++++++++++--- source/os/src/osEnv.c | 10 +-- source/os/src/osString.c | 119 ++++++++++++++++----------- 4 files changed, 132 insertions(+), 64 deletions(-) diff --git a/source/libs/index/inc/indexFstFile.h b/source/libs/index/inc/indexFstFile.h index 0ddffe7df0..91c2aa6d39 100644 --- a/source/libs/index/inc/indexFstFile.h +++ b/source/libs/index/inc/indexFstFile.h @@ -44,6 +44,10 @@ typedef struct IFileCtx { bool readOnly; char buf[256]; int64_t size; + + char wBuf[4096]; + int32_t wBufOffset; + #ifdef USE_MMAP char* ptr; #endif diff --git a/source/libs/index/src/indexFstFile.c b/source/libs/index/src/indexFstFile.c index 5538584754..54e30ce0f4 100644 --- a/source/libs/index/src/indexFstFile.c +++ b/source/libs/index/src/indexFstFile.c @@ -38,14 +38,41 @@ static FORCE_INLINE void idxGenLRUKey(char* buf, const char* path, int32_t block return; } static FORCE_INLINE int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) { + int tlen = len; if (ctx->type == TFILE) { - int nwr = taosWriteFile(ctx->file.pFile, buf, len); - assert(nwr == len); + int32_t cap = sizeof(ctx->file.wBuf); + if (len + ctx->file.wBufOffset >= cap) { + int32_t nw = cap - ctx->file.wBufOffset; + memcpy(ctx->file.wBuf + ctx->file.wBufOffset, buf, nw); + taosWriteFile(ctx->file.pFile, ctx->file.wBuf, cap); + + memset(ctx->file.wBuf, 0, cap); + ctx->file.wBufOffset = 0; + + len -= nw; + buf += nw; + + nw = (len / cap) * cap; + if (nw != 0) { + taosWriteFile(ctx->file.pFile, buf, nw); + } + + len -= nw; + buf += nw; + if (len != 0) { + memcpy(ctx->file.wBuf, buf, len); + } + ctx->file.wBufOffset += len; + } else { + memcpy(ctx->file.wBuf + ctx->file.wBufOffset, buf, len); + ctx->file.wBufOffset += len; + } + } else { memcpy(ctx->mem.buf + ctx->offset, buf, len); } - ctx->offset += len; - return len; + ctx->offset += tlen; + return tlen; } static FORCE_INLINE int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) { int nRead = 0; @@ -127,14 +154,22 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of } static FORCE_INLINE int idxFileCtxGetSize(IFileCtx* ctx) { if (ctx->type == TFILE) { - int64_t file_size = 0; - taosStatFile(ctx->file.buf, &file_size, NULL); - return (int)file_size; + if (ctx->file.readOnly == false) { + return ctx->offset; + } else { + int64_t file_size = 0; + taosStatFile(ctx->file.buf, &file_size, NULL); + return (int)file_size; + } } return 0; } static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) { if (ctx->type == TFILE) { + if (ctx->file.wBufOffset > 0) { + int32_t nw = taosWriteFile(ctx->file.pFile, ctx->file.wBuf, ctx->file.wBufOffset); + ctx->file.wBufOffset = 0; + } taosFsyncFile(ctx->file.pFile); } else { // do nothing @@ -157,10 +192,14 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); taosFtruncateFile(ctx->file.pFile, 0); taosStatFile(path, &ctx->file.size, NULL); + + memset(ctx->file.wBuf, 0, sizeof(ctx->file.wBuf)); + ctx->file.wBufOffset = 0; } else { ctx->file.pFile = taosOpenFile(path, TD_FILE_READ); - taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL); + ctx->file.wBufOffset = 0; + #ifdef USE_MMAP ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.pFile, ctx->file.size); #endif @@ -195,6 +234,10 @@ void idxFileCtxDestroy(IFileCtx* ctx, bool remove) { if (ctx->type == TMEMORY) { taosMemoryFree(ctx->mem.buf); } else { + if (ctx->file.wBufOffset > 0) { + int32_t nw = taosWriteFile(ctx->file.pFile, ctx->file.wBuf, ctx->file.wBufOffset); + ctx->file.wBufOffset = 0; + } ctx->flush(ctx); taosCloseFile(&ctx->file.pFile); if (ctx->file.readOnly) { @@ -202,10 +245,6 @@ void idxFileCtxDestroy(IFileCtx* ctx, bool remove) { munmap(ctx->file.ptr, ctx->file.size); #endif } - if (ctx->file.readOnly == false) { - int64_t file_size = 0; - taosStatFile(ctx->file.buf, &file_size, NULL); - } if (remove) { unlink(ctx->file.buf); } diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index c4627ffb75..7f0e6d1dee 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -37,11 +37,11 @@ float tsNumOfCores = 0; int64_t tsTotalMemoryKB = 0; char *tsProcPath = NULL; -char tsSIMDBuiltins = 0; -char tsSSE42Enable = 0; -char tsAVXEnable = 0; -char tsAVX2Enable = 0; -char tsFMAEnable = 0; +char tsSIMDBuiltins = 0; +char tsSSE42Enable = 0; +char tsAVXEnable = 0; +char tsAVX2Enable = 0; +char tsFMAEnable = 0; void osDefaultInit() { taosSeedRand(taosSafeRand()); diff --git a/source/os/src/osString.c b/source/os/src/osString.c index f03778de2f..5419da1c0c 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -116,39 +116,36 @@ TdUcs4 *tasoUcs4Copy(TdUcs4 *target_ucs4, TdUcs4 *source_ucs4, int32_t len_ucs4) return memcpy(target_ucs4, source_ucs4, len_ucs4 * sizeof(TdUcs4)); } -int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs) { -#ifdef DISALLOW_NCHAR_WITHOUT_ICONV - printf("Nchar cannot be read and written without iconv, please install iconv library and recompile TDengine.\n"); - return -1; -#else - iconv_t cd = iconv_open(tsCharset, DEFAULT_UNICODE_ENCODEC); - size_t ucs4_input_len = ucs4_max_len; - size_t outLen = ucs4_max_len; - if (iconv(cd, (char **)&ucs4, &ucs4_input_len, &mbs, &outLen) == -1) { - iconv_close(cd); - return -1; - } - - iconv_close(cd); - return (int32_t)(ucs4_max_len - outLen); -#endif -} - typedef struct { iconv_t conv; int8_t inUse; } SConv; -SConv *gConv = NULL; -int32_t convUsed = 0; -int32_t gConvMaxNum = 0; +typedef enum { M2C = 0, C2M } ConvType; + +// 0: Mbs --> Ucs4 +// 1: Ucs4--> Mbs +SConv *gConv[2] = {NULL, NULL}; +int32_t convUsed[2] = {0, 0}; +int32_t gConvMaxNum[2] = {0, 0}; int32_t taosConvInit(void) { - gConvMaxNum = 512; - gConv = taosMemoryCalloc(gConvMaxNum, sizeof(SConv)); - for (int32_t i = 0; i < gConvMaxNum; ++i) { - gConv[i].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); - if ((iconv_t)-1 == gConv[i].conv || (iconv_t)0 == gConv[i].conv) { + int8_t M2C = 0; + gConvMaxNum[M2C] = 512; + gConvMaxNum[1 - M2C] = 512; + + gConv[M2C] = taosMemoryCalloc(gConvMaxNum[M2C], sizeof(SConv)); + gConv[1 - M2C] = taosMemoryCalloc(gConvMaxNum[1 - M2C], sizeof(SConv)); + + for (int32_t i = 0; i < gConvMaxNum[M2C]; ++i) { + gConv[M2C][i].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); + if ((iconv_t)-1 == gConv[M2C][i].conv || (iconv_t)0 == gConv[M2C][i].conv) { + return -1; + } + } + for (int32_t i = 0; i < gConvMaxNum[1 - M2C]; ++i) { + gConv[1 - M2C][i].conv = iconv_open(tsCharset, DEFAULT_UNICODE_ENCODEC); + if ((iconv_t)-1 == gConv[1 - M2C][i].conv || (iconv_t)0 == gConv[1 - M2C][i].conv) { return -1; } } @@ -157,23 +154,33 @@ int32_t taosConvInit(void) { } void taosConvDestroy() { - for (int32_t i = 0; i < gConvMaxNum; ++i) { - iconv_close(gConv[i].conv); + int8_t M2C = 0; + for (int32_t i = 0; i < gConvMaxNum[M2C]; ++i) { + iconv_close(gConv[M2C][i].conv); } - taosMemoryFreeClear(gConv); - gConvMaxNum = -1; + for (int32_t i = 0; i < gConvMaxNum[1 - M2C]; ++i) { + iconv_close(gConv[1 - M2C][i].conv); + } + taosMemoryFreeClear(gConv[M2C]); + taosMemoryFreeClear(gConv[1 - M2C]); + gConvMaxNum[M2C] = -1; + gConvMaxNum[1 - M2C] = -1; } -iconv_t taosAcquireConv(int32_t *idx) { - if (gConvMaxNum <= 0) { +iconv_t taosAcquireConv(int32_t *idx, ConvType type) { + if (gConvMaxNum[type] <= 0) { *idx = -1; - return iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); + if (type == M2C) { + return iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); + } else { + return iconv_open(tsCharset, DEFAULT_UNICODE_ENCODEC); + } } while (true) { - int32_t used = atomic_add_fetch_32(&convUsed, 1); - if (used > gConvMaxNum) { - used = atomic_sub_fetch_32(&convUsed, 1); + int32_t used = atomic_add_fetch_32(&convUsed[type], 1); + if (used > gConvMaxNum[type]) { + used = atomic_sub_fetch_32(&convUsed[type], 1); sched_yield(); continue; } @@ -181,31 +188,31 @@ iconv_t taosAcquireConv(int32_t *idx) { break; } - int32_t startId = taosGetSelfPthreadId() % gConvMaxNum; + int32_t startId = taosGetSelfPthreadId() % gConvMaxNum[type]; while (true) { - if (gConv[startId].inUse) { - startId = (startId + 1) % gConvMaxNum; + if (gConv[type][startId].inUse) { + startId = (startId + 1) % gConvMaxNum[type]; continue; } - int8_t old = atomic_val_compare_exchange_8(&gConv[startId].inUse, 0, 1); + int8_t old = atomic_val_compare_exchange_8(&gConv[type][startId].inUse, 0, 1); if (0 == old) { break; } } *idx = startId; - return gConv[startId].conv; + return gConv[type][startId].conv; } -void taosReleaseConv(int32_t idx, iconv_t conv) { +void taosReleaseConv(int32_t idx, iconv_t conv, ConvType type) { if (idx < 0) { iconv_close(conv); return; } - atomic_store_8(&gConv[idx].inUse, 0); - atomic_sub_fetch_32(&convUsed, 1); + atomic_store_8(&gConv[type][idx].inUse, 0); + atomic_sub_fetch_32(&convUsed[type], 1); } bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len) { @@ -216,15 +223,15 @@ bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4 memset(ucs4, 0, ucs4_max_len); int32_t idx = -1; - iconv_t conv = taosAcquireConv(&idx); + iconv_t conv = taosAcquireConv(&idx, M2C); size_t ucs4_input_len = mbsLength; size_t outLeft = ucs4_max_len; if (iconv(conv, (char **)&mbs, &ucs4_input_len, (char **)&ucs4, &outLeft) == -1) { - taosReleaseConv(idx, conv); + taosReleaseConv(idx, conv, M2C); return false; } - taosReleaseConv(idx, conv); + taosReleaseConv(idx, conv, M2C); if (len != NULL) { *len = (int32_t)(ucs4_max_len - outLeft); if (*len < 0) { @@ -236,6 +243,24 @@ bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4 #endif } +int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs) { +#ifdef DISALLOW_NCHAR_WITHOUT_ICONV + printf("Nchar cannot be read and written without iconv, please install iconv library and recompile TDengine.\n"); + return -1; +#else + + int32_t idx = -1; + iconv_t conv = taosAcquireConv(&idx, C2M); + size_t ucs4_input_len = ucs4_max_len; + size_t outLen = ucs4_max_len; + if (iconv(conv, (char **)&ucs4, &ucs4_input_len, &mbs, &outLen) == -1) { + taosReleaseConv(idx, conv, C2M); + return -1; + } + taosReleaseConv(idx, conv, C2M); + return (int32_t)(ucs4_max_len - outLen); +#endif +} bool taosValidateEncodec(const char *encodec) { #ifdef DISALLOW_NCHAR_WITHOUT_ICONV printf("Nchar cannot be read and written without iconv, please install iconv library and recompile TDengine.\n"); From bc693f374cbfe4b7ab276457f2c345b8146d2371 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 23 Dec 2022 22:42:32 +0800 Subject: [PATCH 3/4] opt index write --- source/libs/index/inc/indexFstFile.h | 3 ++- source/libs/index/src/indexFstFile.c | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/libs/index/inc/indexFstFile.h b/source/libs/index/inc/indexFstFile.h index 91c2aa6d39..d15141f79a 100644 --- a/source/libs/index/inc/indexFstFile.h +++ b/source/libs/index/inc/indexFstFile.h @@ -45,8 +45,9 @@ typedef struct IFileCtx { char buf[256]; int64_t size; - char wBuf[4096]; + char* wBuf; int32_t wBufOffset; + int32_t wBufCap; #ifdef USE_MMAP char* ptr; diff --git a/source/libs/index/src/indexFstFile.c b/source/libs/index/src/indexFstFile.c index 54e30ce0f4..4620af8694 100644 --- a/source/libs/index/src/indexFstFile.c +++ b/source/libs/index/src/indexFstFile.c @@ -40,7 +40,7 @@ static FORCE_INLINE void idxGenLRUKey(char* buf, const char* path, int32_t block static FORCE_INLINE int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) { int tlen = len; if (ctx->type == TFILE) { - int32_t cap = sizeof(ctx->file.wBuf); + int32_t cap = ctx->file.wBufCap; if (len + ctx->file.wBufOffset >= cap) { int32_t nw = cap - ctx->file.wBufOffset; memcpy(ctx->file.wBuf + ctx->file.wBufOffset, buf, nw); @@ -193,8 +193,9 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int taosFtruncateFile(ctx->file.pFile, 0); taosStatFile(path, &ctx->file.size, NULL); - memset(ctx->file.wBuf, 0, sizeof(ctx->file.wBuf)); ctx->file.wBufOffset = 0; + ctx->file.wBufCap = kBlockSize * 4; + ctx->file.wBuf = taosMemoryCalloc(1, ctx->file.wBufCap); } else { ctx->file.pFile = taosOpenFile(path, TD_FILE_READ); taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL); @@ -239,6 +240,7 @@ void idxFileCtxDestroy(IFileCtx* ctx, bool remove) { ctx->file.wBufOffset = 0; } ctx->flush(ctx); + taosMemoryFreeClear(ctx->file.wBuf); taosCloseFile(&ctx->file.pFile); if (ctx->file.readOnly) { #ifdef USE_MMAP From 59e63082d5624cc8c61cd280082911aa8fc8af99 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 23 Dec 2022 22:45:59 +0800 Subject: [PATCH 4/4] opt index write --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index f6ff2b16b2..f11378f84c 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -111,12 +111,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { dGError("msg:%p, type:%s pCont is NULL", pRpc, TMSG_INFO(pRpc->msgType)); terrno = TSDB_CODE_INVALID_MSG_LEN; goto _OVER; - } else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) && - (!IsReq(pRpc)) && (pRpc->pCont == NULL)) { - dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code)); - terrno = pRpc->code; - goto _OVER; - } + } /* else if ((pRpc->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRpc->code == TSDB_CODE_RPC_BROKEN_LINK) && + (!IsReq(pRpc)) && (pRpc->pCont == NULL)) { + dGError("msg:%p, type:%s pCont is NULL, err: %s", pRpc, TMSG_INFO(pRpc->msgType), tstrerror(pRpc->code)); + terrno = pRpc->code; + goto _OVER; + }*/ if (pHandle->defaultNtype == NODE_END) { dGError("msg:%p, type:%s not processed since no handle", pRpc, TMSG_INFO(pRpc->msgType));