From e5c2b6aa10ab3fe032567afb1e3ba027b2a55b21 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 12 Jan 2021 22:27:43 +0800 Subject: [PATCH] TD-1207 --- src/common/inc/tdataformat.h | 4 +- src/common/inc/tglobal.h | 4 +- src/common/src/tglobal.c | 8 ++-- src/kit/shell/src/shellMain.c | 2 +- src/mnode/CMakeLists.txt | 2 +- src/mnode/src/mnodeAcct.c | 8 ++-- src/mnode/src/mnodeCluster.c | 10 ++--- src/mnode/src/mnodeDb.c | 8 ++-- src/mnode/src/mnodeDnode.c | 14 +++---- src/mnode/src/mnodeMnode.c | 6 +-- src/mnode/src/mnodeSdb.c | 22 +++++----- src/mnode/src/mnodeShow.c | 2 +- src/mnode/src/mnodeTable.c | 64 ++++++++++++++--------------- src/mnode/src/mnodeUser.c | 12 +++--- src/mnode/src/mnodeVgroup.c | 8 ++-- src/os/inc/osFile.h | 2 +- src/os/inc/osSocket.h | 1 + src/os/src/detail/osFile.c | 2 +- src/os/src/detail/osSocket.c | 10 +++++ src/os/src/windows/wFile.c | 2 +- src/os/src/windows/wSocket.c | 1 + src/plugins/http/CMakeLists.txt | 2 +- src/plugins/http/inc/httpInt.h | 4 +- src/plugins/http/src/httpGcJson.c | 2 +- src/plugins/http/src/httpGzip.c | 4 +- src/plugins/http/src/httpJson.c | 12 +++--- src/plugins/http/src/httpParser.c | 14 +++---- src/plugins/http/src/httpQueue.c | 4 +- src/plugins/http/src/httpServer.c | 22 +++------- src/plugins/http/src/httpTgHandle.c | 2 +- src/plugins/http/src/httpUtil.c | 4 +- src/plugins/monitor/CMakeLists.txt | 2 +- src/plugins/monitor/src/monMain.c | 8 ++-- src/rpc/src/rpcTcp.c | 2 +- src/sync/CMakeLists.txt | 2 +- src/sync/inc/syncInt.h | 4 +- src/sync/inc/syncTcp.h | 4 +- src/sync/src/syncArbitrator.c | 25 +++++++---- src/sync/src/syncMain.c | 14 +++---- src/sync/src/syncRestore.c | 6 +-- src/sync/src/syncRetrieve.c | 11 +++-- src/sync/src/syncTcp.c | 30 +++++++++----- src/util/inc/tsocket.h | 8 ++++ src/vnode/src/vnodeCfg.c | 26 ++++++------ src/wal/CMakeLists.txt | 2 +- src/wal/src/walMgmt.c | 2 +- src/wal/src/walWrite.c | 4 +- 47 files changed, 218 insertions(+), 194 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 8d4949d9b4..f1bf89aed4 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -29,7 +29,7 @@ extern "C" { #define STR_TO_VARSTR(x, str) \ do { \ - VarDataLenT __len = strlen(str); \ + VarDataLenT __len = (int32_t)strlen(str); \ *(VarDataLenT *)(x) = __len; \ memcpy(varDataVal(x), (str), __len); \ } while (0); @@ -42,7 +42,7 @@ extern "C" { #define STR_WITH_SIZE_TO_VARSTR(x, str, _size) \ do { \ - *(VarDataLenT *)(x) = (_size); \ + *(VarDataLenT *)(x) = (int32_t)(_size); \ memcpy(varDataVal(x), (str), (_size)); \ } while (0); diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index f6a08f8198..bf0d4e61df 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -89,8 +89,8 @@ extern int32_t tsMinRowsInFileBlock; extern int32_t tsMaxRowsInFileBlock; extern int16_t tsCommitTime; // seconds extern int32_t tsTimePrecision; -extern int16_t tsCompression; -extern int16_t tsWAL; +extern int8_t tsCompression; +extern int8_t tsWAL; extern int32_t tsFsyncPeriod; extern int32_t tsReplications; extern int32_t tsQuorum; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 551fb96195..a7826f91ad 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -122,8 +122,8 @@ int32_t tsMinRowsInFileBlock = TSDB_DEFAULT_MIN_ROW_FBLOCK; int32_t tsMaxRowsInFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK; int16_t tsCommitTime = TSDB_DEFAULT_COMMIT_TIME; // seconds int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION; -int16_t tsCompression = TSDB_DEFAULT_COMP_LEVEL; -int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL; +int8_t tsCompression = TSDB_DEFAULT_COMP_LEVEL; +int8_t tsWAL = TSDB_DEFAULT_WAL_LEVEL; int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION; int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION; @@ -769,7 +769,7 @@ static void doInitGlobalConfig(void) { cfg.option = "comp"; cfg.ptr = &tsCompression; - cfg.valType = TAOS_CFG_VTYPE_INT16; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = TSDB_MIN_COMP_LEVEL; cfg.maxValue = TSDB_MAX_COMP_LEVEL; @@ -779,7 +779,7 @@ static void doInitGlobalConfig(void) { cfg.option = "walLevel"; cfg.ptr = &tsWAL; - cfg.valType = TAOS_CFG_VTYPE_INT16; + cfg.valType = TAOS_CFG_VTYPE_INT8; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = TSDB_MIN_WAL_LEVEL; cfg.maxValue = TSDB_MAX_WAL_LEVEL; diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 041ad71ccb..496cef41ba 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -21,7 +21,7 @@ pthread_t pid; static tsem_t cancelSem; -void shellQueryInterruptHandler(int signum) { +void shellQueryInterruptHandler(int32_t signum) { tsem_post(&cancelSem); } diff --git a/src/mnode/CMakeLists.txt b/src/mnode/CMakeLists.txt index ff5c9335b6..6e6d515de4 100644 --- a/src/mnode/CMakeLists.txt +++ b/src/mnode/CMakeLists.txt @@ -1,7 +1,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) PROJECT(TDengine) -IF (TD_LINUX) +IF (TD_LINUX OR TD_WINDOWS) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc) diff --git a/src/mnode/src/mnodeAcct.c b/src/mnode/src/mnodeAcct.c index 6fba05674f..afe474df6b 100644 --- a/src/mnode/src/mnodeAcct.c +++ b/src/mnode/src/mnodeAcct.c @@ -81,7 +81,7 @@ static int32_t mnodeAcctActionDecode(SSdbRow *pRow) { } static int32_t mnodeAcctActionRestored() { - int32_t numOfRows = sdbGetNumOfRows(tsAcctSdb); + int64_t numOfRows = sdbGetNumOfRows(tsAcctSdb); if (numOfRows <= 0 && dnodeIsFirstDeploy()) { mInfo("dnode first deploy, create root acct"); int32_t code = mnodeCreateRootAcct(); @@ -97,14 +97,14 @@ static int32_t mnodeAcctActionRestored() { int32_t mnodeInitAccts() { SAcctObj tObj; - tsAcctUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; + tsAcctUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj); SSdbTableDesc desc = { .id = SDB_TABLE_ACCOUNT, .name = "accounts", .hashSessions = TSDB_DEFAULT_ACCOUNTS_HASH_SIZE, .maxRowSize = tsAcctUpdateSize, - .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, + .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj), .keyType = SDB_KEY_STRING, .fpInsert = mnodeAcctActionInsert, .fpDelete = mnodeAcctActionDelete, @@ -206,7 +206,7 @@ void mnodeDropUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) { } static int32_t mnodeCreateRootAcct() { - int32_t numOfAccts = sdbGetNumOfRows(tsAcctSdb); + int64_t numOfAccts = sdbGetNumOfRows(tsAcctSdb); if (numOfAccts != 0) return TSDB_CODE_SUCCESS; SAcctObj *pAcct = malloc(sizeof(SAcctObj)); diff --git a/src/mnode/src/mnodeCluster.c b/src/mnode/src/mnodeCluster.c index a35e304810..0931f31012 100644 --- a/src/mnode/src/mnodeCluster.c +++ b/src/mnode/src/mnodeCluster.c @@ -68,7 +68,7 @@ static int32_t mnodeClusterActionDecode(SSdbRow *pRow) { } static int32_t mnodeClusterActionRestored() { - int32_t numOfRows = sdbGetNumOfRows(tsClusterSdb); + int64_t numOfRows = sdbGetNumOfRows(tsClusterSdb); if (numOfRows <= 0 && dnodeIsFirstDeploy()) { mInfo("dnode first deploy, create cluster"); int32_t code = mnodeCreateCluster(); @@ -84,14 +84,14 @@ static int32_t mnodeClusterActionRestored() { int32_t mnodeInitCluster() { SClusterObj tObj; - tsClusterUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; + tsClusterUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj); SSdbTableDesc desc = { .id = SDB_TABLE_CLUSTER, .name = "cluster", .hashSessions = TSDB_DEFAULT_CLUSTER_HASH_SIZE, .maxRowSize = tsClusterUpdateSize, - .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, + .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj), .keyType = SDB_KEY_STRING, .fpInsert = mnodeClusterActionInsert, .fpDelete = mnodeClusterActionDelete, @@ -139,7 +139,7 @@ void mnodeDecClusterRef(SClusterObj *pCluster) { } static int32_t mnodeCreateCluster() { - int32_t numOfClusters = sdbGetNumOfRows(tsClusterSdb); + int64_t numOfClusters = sdbGetNumOfRows(tsClusterSdb); if (numOfClusters != 0) return TSDB_CODE_SUCCESS; SClusterObj *pCluster = malloc(sizeof(SClusterObj)); @@ -226,7 +226,7 @@ static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *) pWrite = pCluster->createdTime; + *(int64_t *) pWrite = pCluster->createdTime; cols++; mnodeDecClusterRef(pCluster); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 8a03b1cd0e..aa402b257a 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -143,14 +143,14 @@ static int32_t mnodeDbActionRestored() { int32_t mnodeInitDbs() { SDbObj tObj; - tsDbUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; + tsDbUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj); SSdbTableDesc desc = { .id = SDB_TABLE_DB, .name = "dbs", .hashSessions = TSDB_DEFAULT_DBS_HASH_SIZE, .maxRowSize = tsDbUpdateSize, - .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, + .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj), .keyType = SDB_KEY_STRING, .fpInsert = mnodeDbActionInsert, .fpDelete = mnodeDbActionDelete, @@ -192,11 +192,11 @@ SDbObj *mnodeGetDb(char *db) { } void mnodeIncDbRef(SDbObj *pDb) { - return sdbIncRef(tsDbSdb, pDb); + sdbIncRef(tsDbSdb, pDb); } void mnodeDecDbRef(SDbObj *pDb) { - return sdbDecRef(tsDbSdb, pDb); + sdbDecRef(tsDbSdb, pDb); } SDbObj *mnodeGetDbByTableId(char *tableId) { diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 792e41dd5b..01034b170f 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -148,7 +148,7 @@ static int32_t mnodeDnodeActionDecode(SSdbRow *pRow) { } static int32_t mnodeDnodeActionRestored() { - int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb); + int64_t numOfRows = sdbGetNumOfRows(tsDnodeSdb); if (numOfRows <= 0 && dnodeIsFirstDeploy()) { mInfo("dnode first deploy, create dnode:%s", tsLocalEp); mnodeCreateDnode(tsLocalEp, NULL); @@ -165,7 +165,7 @@ static int32_t mnodeDnodeActionRestored() { int32_t mnodeInitDnodes() { SDnodeObj tObj; - tsDnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; + tsDnodeUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj); pthread_mutex_init(&tsDnodeEpsMutex, NULL); SSdbTableDesc desc = { @@ -173,7 +173,7 @@ int32_t mnodeInitDnodes() { .name = "dnodes", .hashSessions = TSDB_DEFAULT_DNODES_HASH_SIZE, .maxRowSize = tsDnodeUpdateSize, - .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, + .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj), .keyType = SDB_KEY_AUTO, .fpInsert = mnodeDnodeActionInsert, .fpDelete = mnodeDnodeActionDelete, @@ -227,7 +227,7 @@ void mnodeCancelGetNextDnode(void *pIter) { } int32_t mnodeGetDnodesNum() { - return sdbGetNumOfRows(tsDnodeSdb); + return (int32_t)sdbGetNumOfRows(tsDnodeSdb); } int32_t mnodeGetOnlinDnodesCpuCoreNum() { @@ -407,7 +407,7 @@ static int32_t mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) { int64_t checkTime = 0; char timestr[32] = "1970-01-01 00:00:00.00"; - (void)taosParseTime(timestr, &checkTime, strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + (void)taosParseTime(timestr, &checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); if ((0 != strncasecmp(clusterCfg->timezone, tsTimezone, strlen(tsTimezone))) && (checkTime != clusterCfg->checkTime)) { mError("\"timezone\"[%s - %s] [%" PRId64 " - %" PRId64 "] cfg parameters inconsistent", clusterCfg->timezone, @@ -638,9 +638,9 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) { char *temp = strchr(dnodeEp, ':'); if (!temp) { - int len = strlen(dnodeEp); + int32_t len = (int32_t)strlen(dnodeEp); if (dnodeEp[len - 1] == ';') dnodeEp[len - 1] = 0; - len = strlen(dnodeEp); + len = (int32_t)strlen(dnodeEp); snprintf(dnodeEp + len, TSDB_EP_LEN - len, ":%d", tsServerPort); } ep = dnodeEp; diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index 3ea41c41c6..49473d3e06 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -136,14 +136,14 @@ int32_t mnodeInitMnodes() { mnodeMnodeInitLock(); SMnodeObj tObj; - tsMnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; + tsMnodeUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj); SSdbTableDesc desc = { .id = SDB_TABLE_MNODE, .name = "mnodes", .hashSessions = TSDB_DEFAULT_MNODES_HASH_SIZE, .maxRowSize = tsMnodeUpdateSize, - .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, + .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj), .keyType = SDB_KEY_INT, .fpInsert = mnodeMnodeActionInsert, .fpDelete = mnodeMnodeActionDelete, @@ -176,7 +176,7 @@ void mnodeCleanupMnodes() { } int32_t mnodeGetMnodesNum() { - return sdbGetNumOfRows(tsMnodeSdb); + return (int32_t)sdbGetNumOfRows(tsMnodeSdb); } void *mnodeGetMnode(int32_t mnodeId) { diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index ae495108b3..17cfd3e9d4 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -207,7 +207,7 @@ static void sdbRestoreTables() { (*pTable->fpRestored)(); } - totalRows += pTable->numOfRows; + totalRows += (int32_t)pTable->numOfRows; numOfTables++; sdbInfo("vgId:1, sdb:%s is checked, rows:%" PRId64, pTable->name, pTable->numOfRows); } @@ -475,7 +475,7 @@ void sdbIncRef(void *tparam, void *pRow) { if (pRow == NULL || tparam == NULL) return; SSdbTable *pTable = tparam; - int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); + int32_t * pRefCount = (int32_t *)((char *)pRow + pTable->refCountPos); int32_t refCount = atomic_add_fetch_32(pRefCount, 1); sdbTrace("vgId:1, sdb:%s, inc ref to row:%p:%s:%d", pTable->name, pRow, sdbGetRowStr(pTable, pRow), refCount); } @@ -484,11 +484,11 @@ void sdbDecRef(void *tparam, void *pRow) { if (pRow == NULL || tparam == NULL) return; SSdbTable *pTable = tparam; - int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); + int32_t * pRefCount = (int32_t *)((char *)pRow + pTable->refCountPos); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); sdbTrace("vgId:1, sdb:%s, dec ref to row:%p:%s:%d", pTable->name, pRow, sdbGetRowStr(pTable, pRow), refCount); - int32_t *updateEnd = pRow + pTable->refCountPos - 4; + int32_t *updateEnd = (int32_t *)((char *)pRow + pTable->refCountPos - 4); if (refCount <= 0 && *updateEnd) { sdbTrace("vgId:1, sdb:%s, row:%p:%s:%d destroyed", pTable->name, pRow, sdbGetRowStr(pTable, pRow), refCount); SSdbRow row = {.pObj = pRow}; @@ -501,7 +501,7 @@ static void *sdbGetRowMeta(SSdbTable *pTable, void *key) { int32_t keySize = sizeof(int32_t); if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { - keySize = strlen((char *)key); + keySize = (int32_t)strlen((char *)key); } void **ppRow = (void **)taosHashGet(pTable->iHandle, key, keySize); @@ -534,7 +534,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow) { int32_t keySize = sizeof(int32_t); if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { - keySize = strlen((char *)key); + keySize = (int32_t)strlen((char *)key); } pthread_mutex_lock(&pTable->mutex); @@ -564,7 +564,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow) { } static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) { - int32_t *updateEnd = pRow->pObj + pTable->refCountPos - 4; + int32_t *updateEnd = (int32_t *)((char*)pRow->pObj + pTable->refCountPos - 4); bool set = atomic_val_compare_exchange_32(updateEnd, 0, 1) == 0; if (!set) { sdbError("vgId:1, sdb:%s, failed to delete key:%s from hash, for it already removed", pTable->name, @@ -577,7 +577,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) { void * key = sdbGetObjKey(pTable, pRow->pObj); int32_t keySize = sizeof(int32_t); if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { - keySize = strlen((char *)key); + keySize = (int32_t)strlen((char *)key); } pthread_mutex_lock(&pTable->mutex); @@ -764,7 +764,7 @@ bool sdbCheckRowDeleted(void *tparam, void *pRow) { SSdbTable *pTable = tparam; if (pTable == NULL) return false; - int32_t *updateEnd = pRow + pTable->refCountPos - 4; + int32_t *updateEnd = (int32_t *)((char*)pRow + pTable->refCountPos - 4); return atomic_val_compare_exchange_32(updateEnd, 1, 1) == 1; } @@ -942,14 +942,14 @@ static int32_t sdbInitWorker() { static void sdbCleanupWorker() { for (int32_t i = 0; i < tsSdbPool.num; ++i) { SSdbWorker *pWorker = tsSdbPool.worker + i; - if (pWorker->thread) { + if (taosCheckPthreadValid(pWorker->thread)) { taosQsetThreadResume(tsSdbWQset); } } for (int32_t i = 0; i < tsSdbPool.num; ++i) { SSdbWorker *pWorker = tsSdbPool.worker + i; - if (pWorker->thread) { + if (taosCheckPthreadValid(pWorker->thread)) { pthread_join(pWorker->thread, NULL); } } diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 6b9f0e26a7..0377df97fd 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -218,7 +218,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { } pRsp->numOfRows = htonl(rowsRead); - pRsp->precision = htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision + pRsp->precision = (int16_t)htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision pMsg->rpcRsp.rsp = pRsp; pMsg->rpcRsp.len = size; diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index de37b09345..a0c8d88c51 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -222,23 +222,23 @@ static int32_t mnodeChildTableActionEncode(SSdbRow *pRow) { SCTableObj *pTable = pRow->pObj; assert(pTable != NULL && pRow->rowData != NULL); - int32_t len = strlen(pTable->info.tableId); + int32_t len = (int32_t)strlen(pTable->info.tableId); if (len >= TSDB_TABLE_FNAME_LEN) return TSDB_CODE_MND_INVALID_TABLE_ID; memcpy(pRow->rowData, pTable->info.tableId, len); - memset(pRow->rowData + len, 0, 1); + memset((char *)pRow->rowData + len, 0, 1); len++; - memcpy(pRow->rowData + len, (char*)pTable + sizeof(char *), tsChildTableUpdateSize); + memcpy((char *)pRow->rowData + len, (char *)pTable + sizeof(char *), tsChildTableUpdateSize); len += tsChildTableUpdateSize; if (pTable->info.type != TSDB_CHILD_TABLE) { int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - memcpy(pRow->rowData + len, pTable->schema, schemaSize); + memcpy((char *)pRow->rowData + len, pTable->schema, schemaSize); len += schemaSize; if (pTable->sqlLen != 0) { - memcpy(pRow->rowData + len, pTable->sql, pTable->sqlLen); + memcpy((char *)pRow->rowData + len, pTable->sql, pTable->sqlLen); len += pTable->sqlLen; } } @@ -253,7 +253,7 @@ static int32_t mnodeChildTableActionDecode(SSdbRow *pRow) { SCTableObj *pTable = calloc(1, sizeof(SCTableObj)); if (pTable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - int32_t len = strlen(pRow->rowData); + int32_t len = (int32_t)strlen(pRow->rowData); if (len >= TSDB_TABLE_FNAME_LEN) { free(pTable); return TSDB_CODE_MND_INVALID_TABLE_ID; @@ -261,7 +261,7 @@ static int32_t mnodeChildTableActionDecode(SSdbRow *pRow) { pTable->info.tableId = strdup(pRow->rowData); len++; - memcpy((char*)pTable + sizeof(char *), pRow->rowData + len, tsChildTableUpdateSize); + memcpy((char *)pTable + sizeof(char *), (char *)pRow->rowData + len, tsChildTableUpdateSize); len += tsChildTableUpdateSize; if (pTable->info.type != TSDB_CHILD_TABLE) { @@ -271,7 +271,7 @@ static int32_t mnodeChildTableActionDecode(SSdbRow *pRow) { mnodeDestroyChildTable(pTable); return TSDB_CODE_MND_INVALID_TABLE_TYPE; } - memcpy(pTable->schema, pRow->rowData + len, schemaSize); + memcpy(pTable->schema, (char *)pRow->rowData + len, schemaSize); len += schemaSize; if (pTable->sqlLen != 0) { @@ -280,7 +280,7 @@ static int32_t mnodeChildTableActionDecode(SSdbRow *pRow) { mnodeDestroyChildTable(pTable); return TSDB_CODE_MND_OUT_OF_MEMORY; } - memcpy(pTable->sql, pRow->rowData + len, pTable->sqlLen); + memcpy(pTable->sql, (char *)pRow->rowData + len, pTable->sqlLen); } } @@ -352,14 +352,14 @@ static int32_t mnodeChildTableActionRestored() { static int32_t mnodeInitChildTables() { SCTableObj tObj; - tsChildTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; + tsChildTableUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type); SSdbTableDesc desc = { .id = SDB_TABLE_CTABLE, .name = "ctables", .hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE, .maxRowSize = sizeof(SCTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN + TSDB_CQ_SQL_SIZE, - .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, + .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj), .keyType = SDB_KEY_VAR_STRING, .fpInsert = mnodeChildTableActionInsert, .fpDelete = mnodeChildTableActionDelete, @@ -501,18 +501,18 @@ static int32_t mnodeSuperTableActionEncode(SSdbRow *pRow) { SSTableObj *pStable = pRow->pObj; assert(pRow->pObj != NULL && pRow->rowData != NULL); - int32_t len = strlen(pStable->info.tableId); + int32_t len = (int32_t)strlen(pStable->info.tableId); if (len >= TSDB_TABLE_FNAME_LEN) len = TSDB_CODE_MND_INVALID_TABLE_ID; memcpy(pRow->rowData, pStable->info.tableId, len); - memset(pRow->rowData + len, 0, 1); + memset((char *)pRow->rowData + len, 0, 1); len++; - memcpy(pRow->rowData + len, (char*)pStable + sizeof(char *), tsSuperTableUpdateSize); + memcpy((char *)pRow->rowData + len, (char *)pStable + sizeof(char *), tsSuperTableUpdateSize); len += tsSuperTableUpdateSize; int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); - memcpy(pRow->rowData + len, pStable->schema, schemaSize); + memcpy((char *)pRow->rowData + len, pStable->schema, schemaSize); len += schemaSize; pRow->rowSize = len; @@ -525,7 +525,7 @@ static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) { SSTableObj *pStable = (SSTableObj *) calloc(1, sizeof(SSTableObj)); if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; - int32_t len = strlen(pRow->rowData); + int32_t len = (int32_t)strlen(pRow->rowData); if (len >= TSDB_TABLE_FNAME_LEN){ free(pStable); return TSDB_CODE_MND_INVALID_TABLE_ID; @@ -533,7 +533,7 @@ static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) { pStable->info.tableId = strdup(pRow->rowData); len++; - memcpy((char*)pStable + sizeof(char *), pRow->rowData + len, tsSuperTableUpdateSize); + memcpy((char *)pStable + sizeof(char *), (char *)pRow->rowData + len, tsSuperTableUpdateSize); len += tsSuperTableUpdateSize; int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); @@ -543,7 +543,7 @@ static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) { return TSDB_CODE_MND_NOT_SUPER_TABLE; } - memcpy(pStable->schema, pRow->rowData + len, schemaSize); + memcpy(pStable->schema, (char *)pRow->rowData + len, schemaSize); pRow->pObj = pStable; @@ -556,14 +556,14 @@ static int32_t mnodeSuperTableActionRestored() { static int32_t mnodeInitSuperTables() { SSTableObj tObj; - tsSuperTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; + tsSuperTableUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type); SSdbTableDesc desc = { .id = SDB_TABLE_STABLE, .name = "stables", .hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE, .maxRowSize = sizeof(SSTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN, - .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, + .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj), .keyType = SDB_KEY_VAR_STRING, .fpInsert = mnodeSuperTableActionInsert, .fpDelete = mnodeSuperTableActionDelete, @@ -1266,7 +1266,7 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c } // int32_t rowSize = 0; - uint32_t len = strlen(newTagName); + uint32_t len = (int32_t)strlen(newTagName); if (len >= TSDB_COL_NAME_LEN) { return TSDB_CODE_MND_COL_NAME_TOO_LONG; } @@ -1429,7 +1429,7 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char } // int32_t rowSize = 0; - uint32_t len = strlen(newName); + uint32_t len = (uint32_t)strlen(newName); if (len >= TSDB_COL_NAME_LEN) { return TSDB_CODE_MND_COL_NAME_TOO_LONG; } @@ -1534,7 +1534,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, tstrncpy(prefix, pDb->name, 64); strcat(prefix, TS_PATH_DELIMITER); - prefixLen = strlen(prefix); + prefixLen = (int32_t)strlen(prefix); SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; char stableName[TSDB_TABLE_NAME_LEN] = {0}; @@ -1559,7 +1559,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - int16_t len = strnlen(stableName, TSDB_TABLE_NAME_LEN - 1); + int16_t len = (int16_t)strnlen(stableName, TSDB_TABLE_NAME_LEN - 1); *(int16_t*) pWrite = len; pWrite += sizeof(int16_t); // todo refactor @@ -1602,7 +1602,7 @@ void mnodeDropAllSuperTables(SDbObj *pDropDb) { char prefix[64] = {0}; tstrncpy(prefix, pDropDb->name, 64); strcat(prefix, TS_PATH_DELIMITER); - int32_t prefixLen = strlen(prefix); + int32_t prefixLen = (int32_t)strlen(prefix); mInfo("db:%s, all super tables will be dropped from sdb", pDropDb->name); @@ -1755,9 +1755,9 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { rpcFreeCont(pRsp); return TSDB_CODE_MND_INVALID_TABLE_NAME; } else { - pRsp->numOfTables = htonl(pRsp->numOfTables); + pRsp->numOfTables = (int32_t)htonl(pRsp->numOfTables); pMsg->rpcRsp.rsp = pRsp; - pMsg->rpcRsp.len = msg - (char *)pRsp; + pMsg->rpcRsp.len = (int32_t)((char *)msg - (char *)pRsp); return TSDB_CODE_SUCCESS; } @@ -2030,7 +2030,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { //SCMCreateTableMsg* p1 = pMsg->rpcMsg.pCont; // there are several tables here. - SCreateTableMsg* pCreate = (SCreateTableMsg*)(pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg)); + SCreateTableMsg* pCreate = (SCreateTableMsg*)((char *)pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg)); int32_t code = grantCheck(TSDB_GRANT_TIMESERIES); if (code != TSDB_CODE_SUCCESS) { @@ -2287,7 +2287,7 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char } // int32_t rowSize = 0; - uint32_t len = strlen(newName); + uint32_t len = (uint32_t)strlen(newName); if (len >= TSDB_COL_NAME_LEN) { return TSDB_CODE_MND_COL_NAME_TOO_LONG; } @@ -2491,7 +2491,7 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) { char prefix[64] = {0}; tstrncpy(prefix, pDropDb->name, 64); strcat(prefix, TS_PATH_DELIMITER); - int32_t prefixLen = strlen(prefix); + int32_t prefixLen = (int32_t)strlen(prefix); mInfo("db:%s, all child tables will be dropped from sdb", pDropDb->name); @@ -2907,7 +2907,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; char prefix[64] = {0}; - int32_t prefixLen = tableIdPrefix(pDb->name, prefix, 64); + int32_t prefixLen = (int32_t)tableIdPrefix(pDb->name, prefix, 64); char* pattern = NULL; if (pShow->payloadLen > 0) { @@ -3143,7 +3143,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro char prefix[64] = {0}; tstrncpy(prefix, pDb->name, 64); strcat(prefix, TS_PATH_DELIMITER); - int32_t prefixLen = strlen(prefix); + int32_t prefixLen = (int32_t)strlen(prefix); while (numOfRows < rows) { pShow->pIter = mnodeGetNextChildTable(pShow->pIter, &pTable); diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index fb26086d04..3a699757d9 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -128,7 +128,7 @@ static void mnodePrintUserAuth() { } static int32_t mnodeUserActionRestored() { - int32_t numOfRows = sdbGetNumOfRows(tsUserSdb); + int64_t numOfRows = sdbGetNumOfRows(tsUserSdb); if (numOfRows <= 0 && dnodeIsFirstDeploy()) { mInfo("dnode first deploy, create root user"); SAcctObj *pAcct = mnodeGetAcct(TSDB_DEFAULT_USER); @@ -148,14 +148,14 @@ static int32_t mnodeUserActionRestored() { int32_t mnodeInitUsers() { SUserObj tObj; - tsUserUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; + tsUserUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj); SSdbTableDesc desc = { .id = SDB_TABLE_USER, .name = "users", .hashSessions = TSDB_DEFAULT_USERS_HASH_SIZE, .maxRowSize = tsUserUpdateSize, - .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, + .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj), .keyType = SDB_KEY_STRING, .fpInsert = mnodeUserActionInsert, .fpDelete = mnodeUserActionDelete, @@ -204,11 +204,11 @@ void mnodeCancelGetNextUser(void *pIter) { } void mnodeIncUserRef(SUserObj *pUser) { - return sdbIncRef(tsUserSdb, pUser); + sdbIncRef(tsUserSdb, pUser); } void mnodeDecUserRef(SUserObj *pUser) { - return sdbDecRef(tsUserSdb, pUser); + sdbDecRef(tsUserSdb, pUser); } static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) { @@ -561,7 +561,7 @@ static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg) { void mnodeDropAllUsers(SAcctObj *pAcct) { void * pIter = NULL; int32_t numOfUsers = 0; - int32_t acctNameLen = strlen(pAcct->user); + int32_t acctNameLen = (int32_t)strlen(pAcct->user); SUserObj *pUser = NULL; while (1) { diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 827be0687d..bd89f2a267 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -206,14 +206,14 @@ static int32_t mnodeVgroupActionRestored() { int32_t mnodeInitVgroups() { SVgObj tObj; - tsVgUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; + tsVgUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj); SSdbTableDesc desc = { .id = SDB_TABLE_VGROUP, .name = "vgroups", .hashSessions = TSDB_DEFAULT_VGROUPS_HASH_SIZE, .maxRowSize = tsVgUpdateSize, - .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, + .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj), .keyType = SDB_KEY_AUTO, .fpInsert = mnodeVgroupActionInsert, .fpDelete = mnodeVgroupActionDelete, @@ -245,11 +245,11 @@ int32_t mnodeInitVgroups() { } void mnodeIncVgroupRef(SVgObj *pVgroup) { - return sdbIncRef(tsVgroupSdb, pVgroup); + sdbIncRef(tsVgroupSdb, pVgroup); } void mnodeDecVgroupRef(SVgObj *pVgroup) { - return sdbDecRef(tsVgroupSdb, pVgroup); + sdbDecRef(tsVgroupSdb, pVgroup); } SVgObj *mnodeGetVgroup(int32_t vgId) { diff --git a/src/os/inc/osFile.h b/src/os/inc/osFile.h index c9b3b9cd76..91ac5c9958 100644 --- a/src/os/inc/osFile.h +++ b/src/os/inc/osFile.h @@ -37,7 +37,7 @@ int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstP } // TAOS_OS_FUNC_FILE_SENDIFLE -int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size); +int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t *offset, int64_t size); int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size); #ifdef TAOS_RANDOM_FILE_FAIL diff --git a/src/os/inc/osSocket.h b/src/os/inc/osSocket.h index 13d3fa4079..4e2671ce2b 100644 --- a/src/os/inc/osSocket.h +++ b/src/os/inc/osSocket.h @@ -61,6 +61,7 @@ extern "C" { int32_t taosSetNonblocking(SOCKET sock, int32_t on); void taosIgnSIGPIPE(); void taosBlockSIGPIPE(); +void taosSetMaskSIGPIPE(); // TAOS_OS_FUNC_SOCKET_SETSOCKETOPT int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen); diff --git a/src/os/src/detail/osFile.c b/src/os/src/detail/osFile.c index 2e6886aa21..6ba38635f4 100644 --- a/src/os/src/detail/osFile.c +++ b/src/os/src/detail/osFile.c @@ -121,7 +121,7 @@ int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) { #ifndef TAOS_OS_FUNC_FILE_SENDIFLE -int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size) { +int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t *offset, int64_t size) { int64_t leftbytes = size; int64_t sentbytes; diff --git a/src/os/src/detail/osSocket.c b/src/os/src/detail/osSocket.c index 729471247f..d03e8086bf 100644 --- a/src/os/src/detail/osSocket.c +++ b/src/os/src/detail/osSocket.c @@ -53,6 +53,16 @@ void taosBlockSIGPIPE() { } } +void taosSetMaskSIGPIPE() { + sigset_t signal_mask; + sigemptyset(&signal_mask); + sigaddset(&signal_mask, SIGPIPE); + int32_t rc = pthread_sigmask(SIG_SETMASK, &signal_mask, NULL); + if (rc != 0) { + uError("failed to setmask SIGPIPE"); + } +} + #endif #ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT diff --git a/src/os/src/windows/wFile.c b/src/os/src/windows/wFile.c index 2204135ae6..a127c17ce7 100644 --- a/src/os/src/windows/wFile.c +++ b/src/os/src/windows/wFile.c @@ -78,7 +78,7 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co return writeLen; } -int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t* offset, int64_t size) { +int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t* offset, int64_t size) { uError("taosSendFile no implemented yet"); return 0; } diff --git a/src/os/src/windows/wSocket.c b/src/os/src/windows/wSocket.c index 9697c5e65f..4e6ee15e77 100644 --- a/src/os/src/windows/wSocket.c +++ b/src/os/src/windows/wSocket.c @@ -48,6 +48,7 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) { void taosIgnSIGPIPE() {} void taosBlockSIGPIPE() {} +void taosSetMaskSIGPIPE() {} int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { if (level == SOL_SOCKET && optname == TCP_KEEPCNT) { diff --git a/src/plugins/http/CMakeLists.txt b/src/plugins/http/CMakeLists.txt index 5d8b52986a..86d857876d 100644 --- a/src/plugins/http/CMakeLists.txt +++ b/src/plugins/http/CMakeLists.txt @@ -9,7 +9,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) -IF (TD_LINUX) +IF (TD_LINUX OR TD_WINDOWS) ADD_LIBRARY(http ${SRC}) TARGET_LINK_LIBRARIES(http z) diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h index ebdfabf310..ada67efbfd 100644 --- a/src/plugins/http/inc/httpInt.h +++ b/src/plugins/http/inc/httpInt.h @@ -166,7 +166,7 @@ typedef struct HttpThread { HttpContext * pHead; pthread_mutex_t threadMutex; bool stop; - int32_t pollFd; + SOCKET pollFd; int32_t numOfContexts; int32_t threadId; char label[HTTP_LABEL_SIZE]; @@ -177,7 +177,7 @@ typedef struct HttpServer { char label[HTTP_LABEL_SIZE]; uint32_t serverIp; uint16_t serverPort; - int32_t fd; + SOCKET fd; int32_t numOfThreads; int32_t methodScannerLen; int32_t requestNum; diff --git a/src/plugins/http/src/httpGcJson.c b/src/plugins/http/src/httpGcJson.c index 2c9eca11de..3a053afd87 100644 --- a/src/plugins/http/src/httpGcJson.c +++ b/src/plugins/http/src/httpGcJson.c @@ -158,7 +158,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, if (row[i]!= NULL){ len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:", fields[i].name); memcpy(target + len, (char *) row[i], length[i]); - len = strlen(target); + len = (int32_t)strlen(target); } break; default: diff --git a/src/plugins/http/src/httpGzip.c b/src/plugins/http/src/httpGzip.c index 54f900c755..d94e3b1235 100644 --- a/src/plugins/http/src/httpGzip.c +++ b/src/plugins/http/src/httpGzip.c @@ -133,7 +133,7 @@ int32_t ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, int32_t len) { if (ret!=Z_STREAM_END) continue; } - int32_t len = gzip->gzip->next_out - (z_const Bytef*)gzip->chunk; + int32_t len = (int32_t)(gzip->gzip->next_out - (z_const Bytef*)gzip->chunk); gzip->gzip->next_out[0] = '\0'; gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len); @@ -155,7 +155,7 @@ int32_t ehttp_gzip_finish(ehttp_gzip_t *gzip) { if (ret != Z_STREAM_END) return -1; - int32_t len = gzip->gzip->next_out - (z_const Bytef*)gzip->chunk; + int32_t len = (int32_t)(gzip->gzip->next_out - (z_const Bytef*)gzip->chunk); gzip->gzip->next_out[0] = '\0'; gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len); diff --git a/src/plugins/http/src/httpJson.c b/src/plugins/http/src/httpJson.c index 1aa6cfac4b..132553ffe1 100644 --- a/src/plugins/http/src/httpJson.c +++ b/src/plugins/http/src/httpJson.c @@ -93,7 +93,7 @@ int32_t httpWriteBufNoTrace(struct HttpContext *pContext, const char *buf, int32 int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { int32_t remain = 0; char sLen[24]; - uint64_t srcLen = (uint64_t) (buf->lst - buf->buf); + int32_t srcLen = (int32_t) (buf->lst - buf->buf); if (buf->pContext->fd <= 0) { httpTrace("context:%p, fd:%d, write json body error", buf->pContext, buf->pContext->fd); @@ -113,11 +113,11 @@ int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { httpTrace("context:%p, fd:%d, no data need dump", buf->pContext, buf->pContext->fd); return 0; // there is no data to dump. } else { - int32_t len = sprintf(sLen, "%" PRIx64 "\r\n", srcLen); - httpTrace("context:%p, fd:%d, write body, chunkSize:%" PRIu64 ", response:\n%s", buf->pContext, buf->pContext->fd, + int32_t len = sprintf(sLen, "%d\r\n", srcLen); + httpTrace("context:%p, fd:%d, write body, chunkSize:%d, response:\n%s", buf->pContext, buf->pContext->fd, srcLen, buf->buf); httpWriteBufNoTrace(buf->pContext, sLen, len); - remain = httpWriteBufNoTrace(buf->pContext, buf->buf, (int32_t)srcLen); + remain = httpWriteBufNoTrace(buf->pContext, buf->buf, srcLen); } } else { char compressBuf[JSON_BUFFER_SIZE] = {0}; @@ -126,7 +126,7 @@ int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { if (ret == 0) { if (compressBufLen > 0) { int32_t len = sprintf(sLen, "%x\r\n", compressBufLen); - httpTrace("context:%p, fd:%d, write body, chunkSize:%" PRIu64 ", compressSize:%d, last:%d, response:\n%s", + httpTrace("context:%p, fd:%d, write body, chunkSize:%d, compressSize:%d, last:%d, response:\n%s", buf->pContext, buf->pContext->fd, srcLen, compressBufLen, isTheLast, buf->buf); httpWriteBufNoTrace(buf->pContext, sLen, len); remain = httpWriteBufNoTrace(buf->pContext, (const char*)compressBuf, compressBufLen); @@ -136,7 +136,7 @@ int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { remain = 0; // there is no data to dump. } } else { - httpError("context:%p, fd:%d, failed to compress data, chunkSize:%" PRIu64 ", last:%d, error:%d, response:\n%s", + httpError("context:%p, fd:%d, failed to compress data, chunkSize:%d, last:%d, error:%d, response:\n%s", buf->pContext, buf->pContext->fd, srcLen, isTheLast, ret, buf->buf); remain = 0; } diff --git a/src/plugins/http/src/httpParser.c b/src/plugins/http/src/httpParser.c index b844834537..331ca079fa 100644 --- a/src/plugins/http/src/httpParser.c +++ b/src/plugins/http/src/httpParser.c @@ -153,7 +153,7 @@ static int32_t httpOnRequestLine(HttpParser *pParser, char *method, char *target for (int32_t i = 0; i < HTTP_MAX_URL; i++) { char *pSeek = strchr(pStart, '/'); if (pSeek == NULL) { - (void)httpAppendString(pParser->path + i, pStart, strlen(pStart)); + (void)httpAppendString(pParser->path + i, pStart, (int32_t)strlen(pStart)); break; } else { (void)httpAppendString(pParser->path + i, pStart, (int32_t)(pSeek - pStart)); @@ -285,7 +285,7 @@ static int32_t httpOnParseHeaderField(HttpParser *parser, const char *key, const free(t); free(s); httpTrace("context:%p, fd:%d, basic auth:%s", pContext, pContext->fd, parser->authContent); - int32_t ok = httpParseBasicAuthToken(pContext, parser->authContent, strlen(parser->authContent)); + int32_t ok = httpParseBasicAuthToken(pContext, parser->authContent, (int32_t)strlen(parser->authContent)); if (ok != 0) { httpOnError(parser, 0, TSDB_CODE_HTTP_INVALID_BASIC_AUTH); return -1; @@ -299,7 +299,7 @@ static int32_t httpOnParseHeaderField(HttpParser *parser, const char *key, const free(t); free(s); httpTrace("context:%p, fd:%d, taosd auth:%s", pContext, pContext->fd, parser->authContent); - int32_t ok = httpParseTaosdAuthToken(pContext, parser->authContent, strlen(parser->authContent)); + int32_t ok = httpParseTaosdAuthToken(pContext, parser->authContent, (int32_t)strlen(parser->authContent)); if (ok != 0) { httpOnError(parser, 0, TSDB_CODE_HTTP_INVALID_TAOSD_AUTH); return -1; @@ -524,14 +524,14 @@ char *httpDecodeUrl(const char *enc) { int32_t hex, cnt; int32_t n = sscanf(p+1, "%2x%n", &hex, &cnt); if (n!=1 && cnt !=2) { ok = 0; break; } - if (httpAppendString(&str, enc, p-enc)) { ok = 0; break; } + if (httpAppendString(&str, enc, (int32_t)(p-enc))) { ok = 0; break; } char c = (char)hex; if (httpAppendString(&str, &c, 1)) { ok = 0; break; } enc = p+3; } char *dec = NULL; if (ok && *enc) { - if (httpAppendString(&str, enc, strlen(enc))) { ok = 0; } + if (httpAppendString(&str, enc, (int32_t)strlen(enc))) { ok = 0; } } if (ok) { dec = str.str; @@ -667,7 +667,7 @@ static int32_t httpParserOnVersion(HttpParser *parser, HTTP_PARSER_STATE state, int32_t ok = 0; do { const char *prefix = "HTTP/1."; - int32_t len = strlen(prefix); + int32_t len = (int32_t)strlen(prefix); if (parser->str.pos < len) { if (prefix[parser->str.pos] != c) { httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c); @@ -811,7 +811,7 @@ static int32_t httpParserOnCrlf(HttpParser *parser, HTTP_PARSER_STATE state, con int32_t ok = 0; do { const char *s = "\r\n"; - int32_t len = strlen(s); + int32_t len = (int32_t)strlen(s); if (s[parser->str.pos] != c) { httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c); ok = -1; diff --git a/src/plugins/http/src/httpQueue.c b/src/plugins/http/src/httpQueue.c index 1c039abb4d..7ae54717f3 100644 --- a/src/plugins/http/src/httpQueue.c +++ b/src/plugins/http/src/httpQueue.c @@ -134,14 +134,14 @@ void httpCleanupResultQueue() { for (int32_t i = 0; i < tsHttpPool.num; ++i) { SHttpWorker *pWorker = tsHttpPool.httpWorker + i; - if (pWorker->thread) { + if (taosCheckPthreadValid(pWorker->thread)) { taosQsetThreadResume(tsHttpQset); } } for (int32_t i = 0; i < tsHttpPool.num; ++i) { SHttpWorker *pWorker = tsHttpPool.httpWorker + i; - if (pWorker->thread) { + if (taosCheckPthreadValid(pWorker->thread)) { pthread_join(pWorker->thread, NULL); } } diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index 4896d50c6c..ee6addd6fa 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -25,10 +25,6 @@ #include "httpResp.h" #include "httpUtil.h" -#ifndef EPOLLWAKEUP - #define EPOLLWAKEUP (1u << 29) -#endif - static bool httpReadData(HttpContext *pContext); static void httpStopThread(HttpThread* pThread) { @@ -49,10 +45,10 @@ static void httpStopThread(HttpThread* pThread) { pthread_join(pThread->thread, NULL); if (fd != -1) { - close(fd); + taosCloseSocket(fd); } - close(pThread->pollFd); + taosCloseSocket(pThread->pollFd); pthread_mutex_destroy(&(pThread->threadMutex)); } @@ -77,10 +73,7 @@ static void httpProcessHttpData(void *param) { HttpContext *pContext; int32_t fdNum; - sigset_t set; - sigemptyset(&set); - sigaddset(&set, SIGPIPE); - pthread_sigmask(SIG_SETMASK, &set, NULL); + taosSetMaskSIGPIPE(); while (1) { struct epoll_event events[HTTP_MAX_EVENTS]; @@ -162,10 +155,7 @@ static void *httpAcceptHttpConnection(void *arg) { HttpContext * pContext = NULL; int32_t totalFds = 0; - sigset_t set; - sigemptyset(&set); - sigaddset(&set, SIGPIPE); - pthread_sigmask(SIG_SETMASK, &set, NULL); + taosSetMaskSIGPIPE(); pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort); @@ -242,7 +232,7 @@ static void *httpAcceptHttpConnection(void *arg) { threadId = threadId % pServer->numOfThreads; } - close(pServer->fd); + taosCloseSocket(pServer->fd); return NULL; } @@ -265,7 +255,7 @@ bool httpInitConnect() { return false; } - pThread->pollFd = epoll_create(HTTP_MAX_EVENTS); // size does not matter + pThread->pollFd = (SOCKET)epoll_create(HTTP_MAX_EVENTS); // size does not matter if (pThread->pollFd < 0) { httpError("http thread:%s, failed to create HTTP epoll", pThread->label); pthread_mutex_destroy(&(pThread->threadMutex)); diff --git a/src/plugins/http/src/httpTgHandle.c b/src/plugins/http/src/httpTgHandle.c index e2b57b87bb..f9c8860afe 100644 --- a/src/plugins/http/src/httpTgHandle.c +++ b/src/plugins/http/src/httpTgHandle.c @@ -276,7 +276,7 @@ int32_t tgReadSchema(char *fileName) { rewind(fp); char * content = (char *)calloc(contentSize + 1, 1); - int32_t result = fread(content, 1, contentSize, fp); + int32_t result = (int32_t)fread(content, 1, contentSize, fp); if (result != contentSize) { httpError("failed to read telegraf schema file:%s", fileName); diff --git a/src/plugins/http/src/httpUtil.c b/src/plugins/http/src/httpUtil.c index 39168ee96d..51333d2118 100644 --- a/src/plugins/http/src/httpUtil.c +++ b/src/plugins/http/src/httpUtil.c @@ -388,7 +388,7 @@ int32_t httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int3 if (inflateEnd(&gzipStream) != Z_OK) { return -4; } - *nDestData = gzipStream.total_out; + *nDestData = (int32_t)gzipStream.total_out; return 0; } @@ -417,7 +417,7 @@ int32_t httpGzipCompress(HttpContext *pContext, char *srcData, int32_t nSrcData, return -1; } - int32_t cacheLen = pContext->gzipStream.total_out - lastTotalLen; + int32_t cacheLen = (int32_t)(pContext->gzipStream.total_out - lastTotalLen); if (cacheLen >= *nDestData) { return -2; } diff --git a/src/plugins/monitor/CMakeLists.txt b/src/plugins/monitor/CMakeLists.txt index edea2187ea..c9314fe1cb 100644 --- a/src/plugins/monitor/CMakeLists.txt +++ b/src/plugins/monitor/CMakeLists.txt @@ -6,7 +6,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) AUX_SOURCE_DIRECTORY(./src SRC) -IF (TD_LINUX) +IF (TD_LINUX OR TD_WINDOWS) ADD_LIBRARY(monitor ${SRC}) IF (TD_SOMODE_STATIC) diff --git a/src/plugins/monitor/src/monMain.c b/src/plugins/monitor/src/monMain.c index 9443b1ce12..85d28bf0a5 100644 --- a/src/plugins/monitor/src/monMain.c +++ b/src/plugins/monitor/src/monMain.c @@ -79,8 +79,8 @@ int32_t monInitSystem() { strcpy(tsMonitor.ep, tsLocalEp); } - int len = strlen(tsMonitor.ep); - for (int i = 0; i < len; ++i) { + int32_t len = (int32_t)strlen(tsMonitor.ep); + for (int32_t i = 0; i < len; ++i) { if (tsMonitor.ep[i] == ':' || tsMonitor.ep[i] == '-' || tsMonitor.ep[i] == '.') { tsMonitor.ep[i] = '_'; } @@ -148,7 +148,7 @@ static void *monThreadFunc(void *param) { } if (tsMonitor.state == MON_STATE_NOT_INIT) { - int code = 0; + int32_t code = 0; for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) { monBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex); @@ -330,7 +330,7 @@ static void monSaveSystemInfo() { pos += monBuildReqSql(sql + pos); void *res = taos_query(tsMonitor.conn, tsMonitor.sql); - int code = taos_errno(res); + int32_t code = taos_errno(res); taos_free_result(res); if (code != 0) { diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 178b96c423..d11de3bc58 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -132,7 +132,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread break; } - pThreadObj->pollFd = (int64_t)epoll_create(10); // size does not matter + pThreadObj->pollFd = (SOCKET)epoll_create(10); // size does not matter if (pThreadObj->pollFd < 0) { tError("%s failed to create TCP epoll", label); code = -1; diff --git a/src/sync/CMakeLists.txt b/src/sync/CMakeLists.txt index aa38a56f38..7a0ea85867 100644 --- a/src/sync/CMakeLists.txt +++ b/src/sync/CMakeLists.txt @@ -4,7 +4,7 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) -IF (TD_LINUX) +IF (TD_LINUX OR TD_WINDOWS) LIST(REMOVE_ITEM SRC src/syncArbitrator.c) ADD_LIBRARY(sync ${SRC}) TARGET_LINK_LIBRARIES(sync tutil pthread common) diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index eef687d647..e43140d4e6 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -82,8 +82,8 @@ typedef struct SsyncPeer { uint64_t sversion; // track the peer version in retrieve process uint64_t lastFileVer; // track the file version while retrieve uint64_t lastWalVer; // track the wal version while retrieve - int32_t syncFd; - int32_t peerFd; // forward FD + SOCKET syncFd; + SOCKET peerFd; // forward FD int32_t numOfRetrieves; // number of retrieves tried int32_t fileChanged; // a flag to indicate file is changed during retrieving process int32_t refCount; diff --git a/src/sync/inc/syncTcp.h b/src/sync/inc/syncTcp.h index d4674fee6b..b322c3440c 100644 --- a/src/sync/inc/syncTcp.h +++ b/src/sync/inc/syncTcp.h @@ -27,12 +27,12 @@ typedef struct { int32_t bufferSize; void (*processBrokenLink)(int64_t handleId); int32_t (*processIncomingMsg)(int64_t handleId, void *buffer); - void (*processIncomingConn)(int32_t fd, uint32_t ip); + void (*processIncomingConn)(SOCKET fd, uint32_t ip); } SPoolInfo; void *syncOpenTcpThreadPool(SPoolInfo *pInfo); void syncCloseTcpThreadPool(void *); -void *syncAllocateTcpConn(void *, int64_t rid, int32_t connFd); +void *syncAllocateTcpConn(void *, int64_t rid, SOCKET connFd); void syncFreeTcpConn(void *); #ifdef __cplusplus diff --git a/src/sync/src/syncArbitrator.c b/src/sync/src/syncArbitrator.c index fed0774346..187e7a3b46 100644 --- a/src/sync/src/syncArbitrator.c +++ b/src/sync/src/syncArbitrator.c @@ -27,8 +27,12 @@ #include "syncInt.h" #include "syncTcp.h" -static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); -static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); +#ifndef SIGHUP + #define SIGHUP SIGTERM +#endif + +static void arbSignalHandler(int32_t signum); +static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp); static void arbProcessBrokenLink(int64_t rid); static int32_t arbProcessPeerMsg(int64_t rid, void *buffer); static tsem_t tsArbSem; @@ -36,7 +40,7 @@ static void * tsArbTcpPool; typedef struct { char id[TSDB_EP_LEN + 24]; - int32_t nodeFd; + SOCKET nodeFd; void * pConn; } SNodeConn; @@ -70,8 +74,9 @@ int32_t main(int32_t argc, char *argv[]) { /* Set termination handler. */ struct sigaction act = {{0}}; - act.sa_flags = SA_SIGINFO; - act.sa_sigaction = arbSignalHandler; + memset(&act, 0, sizeof(struct sigaction)); + + act.sa_handler = arbSignalHandler; sigaction(SIGTERM, &act, NULL); sigaction(SIGHUP, &act, NULL); sigaction(SIGINT, &act, NULL); @@ -103,12 +108,11 @@ int32_t main(int32_t argc, char *argv[]) { syncCloseTcpThreadPool(tsArbTcpPool); sInfo("TAOS arbitrator is shut down"); - closelog(); return 0; } -static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { +static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { char ipstr[24]; tinet_ntoa(ipstr, sourceIp); sDebug("peer TCP connection from ip:%s", ipstr); @@ -172,15 +176,18 @@ static int32_t arbProcessPeerMsg(int64_t rid, void *buffer) { return 0; } -static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) { +static void arbSignalHandler(int32_t signum) { struct sigaction act = {{0}}; act.sa_handler = SIG_IGN; sigaction(SIGTERM, &act, NULL); sigaction(SIGHUP, &act, NULL); sigaction(SIGINT, &act, NULL); +#ifndef WINDOWS sInfo("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid); - +#else + sInfo("shut down signal is %d", signum); +#endif // inform main thread to exit tsem_post(&tsArbSem); } diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 98100fbdd8..b43f5824be 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -45,7 +45,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId); static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId); static void syncProcessBrokenLink(int64_t rid); static int32_t syncProcessPeerMsg(int64_t rid, void *buffer); -static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); +static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp); static void syncRemovePeer(SSyncPeer *pPeer); static void syncAddArbitrator(SSyncNode *pNode); static void syncFreeNode(void *); @@ -544,7 +544,7 @@ static void syncClosePeerConn(SSyncPeer *pPeer) { sDebug("%s, pfd:%d sfd:%d will be closed", pPeer->id, pPeer->peerFd, pPeer->syncFd); taosTmrStopA(&pPeer->timer); - taosClose(pPeer->syncFd); + taosCloseSocket(pPeer->syncFd); if (pPeer->peerFd >= 0) { pPeer->peerFd = -1; void *pConn = pPeer->pConn; @@ -869,7 +869,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { if (nodeRole != TAOS_SYNC_ROLE_MASTER) { sError("%s, I am not master anymore", pPeer->id); - taosClose(pPeer->syncFd); + taosCloseSocket(pPeer->syncFd); return; } @@ -1114,7 +1114,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { return; } - int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); + SOCKET connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (connFd < 0) { sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno)); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); @@ -1132,7 +1132,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { if (pPeer->isArb) tsArbOnline = 1; } else { sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno)); - taosClose(connFd); + taosCloseSocket(connFd); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); } } @@ -1171,7 +1171,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; nodeSStatus = TAOS_SYNC_STATUS_INIT; sError("%s, failed to create sync restore thread, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); - taosClose(pPeer->syncFd); + taosCloseSocket(pPeer->syncFd); syncReleasePeer(pPeer); } else { sInfo("%s, sync restore thread:0x%08" PRIx64 " create successfully, rid:%" PRId64, pPeer->id, @@ -1179,7 +1179,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { } } -static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { +static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { char ipstr[24]; int32_t i; diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 78520c6608..bd4f9bd3e1 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -365,7 +365,7 @@ void *syncRestoreData(void *param) { SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); - __sync_fetch_and_add(&tsSyncNum, 1); + atomic_add_fetch_32(&tsSyncNum, 1); sInfo("%s, start to restore data, sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); (*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING); @@ -390,9 +390,9 @@ void *syncRestoreData(void *param) { nodeSStatus = TAOS_SYNC_STATUS_INIT; sInfo("%s, restore data over, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); - taosClose(pPeer->syncFd); + taosCloseSocket(pPeer->syncFd); syncCloseRecvBuffer(pNode); - __sync_fetch_and_sub(&tsSyncNum, 1); + atomic_sub_fetch_32(&tsSyncNum, 1); // The ref is obtained in both the create thread and the current thread, so it is released twice syncReleasePeer(pPeer); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 153886102e..9ff7955351 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -14,7 +14,6 @@ */ #define _DEFAULT_SOURCE -#include #include "os.h" #include "taoserror.h" #include "tlog.h" @@ -160,7 +159,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { break; } - ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); + ret = (int32_t)taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); close(sfd); if (ret < 0) { code = -1; @@ -228,7 +227,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi return -1; } - int32_t code = taosLSeek(sfd, offset, SEEK_SET); + int32_t code = (int32_t)taosLSeek(sfd, offset, SEEK_SET); if (code < 0) { sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno)); close(sfd); @@ -322,7 +321,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) // if all data are read out, and no update if (bytes == 0 && !walModified) { // wal not closed, it means some data not flushed to disk, wait for a while - usleep(10000); + taosMsleep(10); } // if bytes > 0, file is updated, or fversion is not reached but file still open, read again @@ -384,7 +383,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { break; } - code = taosSendFile(pPeer->syncFd, sfd, NULL, size); + code = (int32_t)taosSendFile(pPeer->syncFd, sfd, NULL, size); close(sfd); if (code < 0) { sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); @@ -501,7 +500,7 @@ void *syncRetrieveData(void *param) { } pPeer->fileChanged = 0; - taosClose(pPeer->syncFd); + taosCloseSocket(pPeer->syncFd); // The ref is obtained in both the create thread and the current thread, so it is released twice sInfo("%s, sync retrieve data over, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); diff --git a/src/sync/src/syncTcp.c b/src/sync/src/syncTcp.c index 4744666737..829c9ceec6 100644 --- a/src/sync/src/syncTcp.c +++ b/src/sync/src/syncTcp.c @@ -24,10 +24,18 @@ #include "syncInt.h" #include "syncTcp.h" +#ifdef WINDOWS +#include "wepoll.h" +#endif + +#ifndef EPOLLWAKEUP + #define EPOLLWAKEUP (1u << 29) +#endif + typedef struct SThreadObj { pthread_t thread; bool stop; - int32_t pollFd; + SOCKET pollFd; int32_t numOfFds; struct SPoolObj *pPool; } SThreadObj; @@ -37,13 +45,13 @@ typedef struct SPoolObj { SThreadObj **pThread; pthread_t thread; int32_t nextId; - int32_t acceptFd; // FD for accept new connection + SOCKET acceptFd; // FD for accept new connection } SPoolObj; typedef struct { SThreadObj *pThread; int64_t handleId; - int32_t fd; + SOCKET fd; int32_t closedByApp; } SConnObj; @@ -82,7 +90,7 @@ void *syncOpenTcpThreadPool(SPoolInfo *pInfo) { pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&(pPool->thread), &thattr, (void *)syncAcceptPeerTcpConnection, pPool) != 0) { sError("failed to create accept thread for TCP server since %s", strerror(errno)); - close(pPool->acceptFd); + taosCloseSocket(pPool->acceptFd); tfree(pPool->pThread); tfree(pPool); return NULL; @@ -112,7 +120,7 @@ void syncCloseTcpThreadPool(void *param) { tfree(pPool); } -void *syncAllocateTcpConn(void *param, int64_t rid, int32_t connFd) { +void *syncAllocateTcpConn(void *param, int64_t rid, SOCKET connFd) { struct epoll_event event; SPoolObj *pPool = param; @@ -169,7 +177,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) { pThread->numOfFds--; epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL); sDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds); - taosClose(pConn->fd); + taosCloseSocket(pConn->fd); tfree(pConn); } @@ -233,7 +241,7 @@ static void *syncProcessTcpData(void *param) { sDebug("%p TCP epoll thread exits", pThread); - close(pThread->pollFd); + taosCloseSocket(pThread->pollFd); tfree(pThread); tfree(buffer); return NULL; @@ -248,7 +256,7 @@ static void *syncAcceptPeerTcpConnection(void *argv) { while (1) { struct sockaddr_in clientAddr; socklen_t addrlen = sizeof(clientAddr); - int32_t connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen); + SOCKET connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen); if (connFd < 0) { if (errno == EINVAL) { sDebug("%p TCP server accept is exiting...", pPool); @@ -264,7 +272,7 @@ static void *syncAcceptPeerTcpConnection(void *argv) { (*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr); } - taosClose(pPool->acceptFd); + taosCloseSocket(pPool->acceptFd); return NULL; } @@ -277,7 +285,7 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) { if (pThread == NULL) return NULL; pThread->pPool = pPool; - pThread->pollFd = epoll_create(10); // size does not matter + pThread->pollFd = (SOCKET)epoll_create(10); // size does not matter if (pThread->pollFd < 0) { tfree(pThread); return NULL; @@ -290,7 +298,7 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) { pthread_attr_destroy(&thattr); if (ret != 0) { - close(pThread->pollFd); + taosCloseSocket(pThread->pollFd); tfree(pThread); return NULL; } diff --git a/src/util/inc/tsocket.h b/src/util/inc/tsocket.h index a339955cc0..6b449d6bc9 100644 --- a/src/util/inc/tsocket.h +++ b/src/util/inc/tsocket.h @@ -20,6 +20,14 @@ extern "C" { #endif +#ifdef WINDOWS +#include "wepoll.h" +#endif + +#ifndef EPOLLWAKEUP + #define EPOLLWAKEUP (1u << 29) +#endif + int32_t taosReadn(SOCKET sock, char *buffer, int32_t len); int32_t taosWriteMsg(SOCKET fd, void *ptr, int32_t nbytes); int32_t taosReadMsg(SOCKET fd, void *ptr, int32_t nbytes); diff --git a/src/vnode/src/vnodeCfg.c b/src/vnode/src/vnodeCfg.c index 0b32f97939..1ea774afae 100644 --- a/src/vnode/src/vnodeCfg.c +++ b/src/vnode/src/vnodeCfg.c @@ -78,7 +78,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { goto PARSE_VCFG_ERROR; } - len = fread(content, 1, maxLen, fp); + len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { vError("vgId:%d, failed to read %s, content is null", pVnode->vgId, file); goto PARSE_VCFG_ERROR; @@ -103,14 +103,14 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { vError("vgId:%d, failed to read %s, cfgVersion not found", pVnode->vgId, file); goto PARSE_VCFG_ERROR; } - vnodeMsg.cfg.dbCfgVersion = dbCfgVersion->valueint; + vnodeMsg.cfg.dbCfgVersion = (int32_t)dbCfgVersion->valueint; cJSON *vgCfgVersion = cJSON_GetObjectItem(root, "vgCfgVersion"); if (!vgCfgVersion || vgCfgVersion->type != cJSON_Number) { vError("vgId:%d, failed to read %s, vgCfgVersion not found", pVnode->vgId, file); vnodeMsg.cfg.vgCfgVersion = 0; } else { - vnodeMsg.cfg.vgCfgVersion = vgCfgVersion->valueint; + vnodeMsg.cfg.vgCfgVersion = (int32_t)vgCfgVersion->valueint; } cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize"); @@ -118,56 +118,56 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { vError("vgId:%d, failed to read %s, cacheBlockSize not found", pVnode->vgId, file); goto PARSE_VCFG_ERROR; } - vnodeMsg.cfg.cacheBlockSize = cacheBlockSize->valueint; + vnodeMsg.cfg.cacheBlockSize = (int32_t)cacheBlockSize->valueint; cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks"); if (!totalBlocks || totalBlocks->type != cJSON_Number) { vError("vgId:%d, failed to read %s, totalBlocks not found", pVnode->vgId, file); goto PARSE_VCFG_ERROR; } - vnodeMsg.cfg.totalBlocks = totalBlocks->valueint; + vnodeMsg.cfg.totalBlocks = (int32_t)totalBlocks->valueint; cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile"); if (!daysPerFile || daysPerFile->type != cJSON_Number) { vError("vgId:%d, failed to read %s, daysPerFile not found", pVnode->vgId, file); goto PARSE_VCFG_ERROR; } - vnodeMsg.cfg.daysPerFile = daysPerFile->valueint; + vnodeMsg.cfg.daysPerFile = (int32_t)daysPerFile->valueint; cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep"); if (!daysToKeep || daysToKeep->type != cJSON_Number) { vError("vgId:%d, failed to read %s, daysToKeep not found", pVnode->vgId, file); goto PARSE_VCFG_ERROR; } - vnodeMsg.cfg.daysToKeep = daysToKeep->valueint; + vnodeMsg.cfg.daysToKeep = (int32_t)daysToKeep->valueint; cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1"); if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) { vError("vgId:%d, failed to read %s, daysToKeep1 not found", pVnode->vgId, file); goto PARSE_VCFG_ERROR; } - vnodeMsg.cfg.daysToKeep1 = daysToKeep1->valueint; + vnodeMsg.cfg.daysToKeep1 = (int32_t)daysToKeep1->valueint; cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2"); if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) { vError("vgId:%d, failed to read %s, daysToKeep2 not found", pVnode->vgId, file); goto PARSE_VCFG_ERROR; } - vnodeMsg.cfg.daysToKeep2 = daysToKeep2->valueint; + vnodeMsg.cfg.daysToKeep2 = (int32_t)daysToKeep2->valueint; cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock"); if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) { vError("vgId:%d, failed to read %s, minRowsPerFileBlock not found", pVnode->vgId, file); goto PARSE_VCFG_ERROR; } - vnodeMsg.cfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint; + vnodeMsg.cfg.minRowsPerFileBlock = (int32_t)minRowsPerFileBlock->valueint; cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock"); if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) { vError("vgId:%d, failed to read %s, maxRowsPerFileBlock not found", pVnode->vgId, file); goto PARSE_VCFG_ERROR; } - vnodeMsg.cfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint; + vnodeMsg.cfg.maxRowsPerFileBlock = (int32_t)maxRowsPerFileBlock->valueint; cJSON *precision = cJSON_GetObjectItem(root, "precision"); if (!precision || precision->type != cJSON_Number) { @@ -195,7 +195,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { vError("vgId:%d, failed to read %s, fsyncPeriod not found", pVnode->vgId, file); goto PARSE_VCFG_ERROR; } - vnodeMsg.cfg.fsyncPeriod = fsyncPeriod->valueint; + vnodeMsg.cfg.fsyncPeriod = (int32_t)fsyncPeriod->valueint; cJSON *wals = cJSON_GetObjectItem(root, "wals"); if (!wals || wals->type != cJSON_Number) { @@ -258,7 +258,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { vError("vgId:%d, failed to read %s, nodeId not found", pVnode->vgId, file); goto PARSE_VCFG_ERROR; } - node->nodeId = nodeId->valueint; + node->nodeId = (int32_t)nodeId->valueint; cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { diff --git a/src/wal/CMakeLists.txt b/src/wal/CMakeLists.txt index 359e09287a..2125e4f33c 100644 --- a/src/wal/CMakeLists.txt +++ b/src/wal/CMakeLists.txt @@ -4,7 +4,7 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) -IF (TD_LINUX) +IF (TD_LINUX OR TD_WINDOWS) ADD_LIBRARY(twal ${SRC}) TARGET_LINK_LIBRARIES(twal tutil common) ADD_SUBDIRECTORY(test) diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 72ea239817..62b066500c 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -210,7 +210,7 @@ static int32_t walCreateThread() { static void walStopThread() { tsWal.stop = 1; - if (tsWal.thread) { + if (taosCheckPthreadValid(tsWal.thread)) { pthread_join(tsWal.thread, NULL); } diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index e67127d6e4..0eda6ff786 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -272,7 +272,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch SWalHead *pHead = buffer; while (1) { - int32_t ret = tfRead(tfd, pHead, sizeof(SWalHead)); + int32_t ret = (int32_t)tfRead(tfd, pHead, sizeof(SWalHead)); if (ret == 0) break; if (ret < 0) { @@ -307,7 +307,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch } } - ret = tfRead(tfd, pHead->cont, pHead->len); + ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len); if (ret < 0) { wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno)); code = TAOS_SYSTEM_ERROR(errno);