This commit is contained in:
Shengliang Guan 2021-01-12 22:27:43 +08:00
parent c93810acd6
commit e5c2b6aa10
47 changed files with 218 additions and 194 deletions

View File

@ -29,7 +29,7 @@ extern "C" {
#define STR_TO_VARSTR(x, str) \ #define STR_TO_VARSTR(x, str) \
do { \ do { \
VarDataLenT __len = strlen(str); \ VarDataLenT __len = (int32_t)strlen(str); \
*(VarDataLenT *)(x) = __len; \ *(VarDataLenT *)(x) = __len; \
memcpy(varDataVal(x), (str), __len); \ memcpy(varDataVal(x), (str), __len); \
} while (0); } while (0);
@ -42,7 +42,7 @@ extern "C" {
#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) \ #define STR_WITH_SIZE_TO_VARSTR(x, str, _size) \
do { \ do { \
*(VarDataLenT *)(x) = (_size); \ *(VarDataLenT *)(x) = (int32_t)(_size); \
memcpy(varDataVal(x), (str), (_size)); \ memcpy(varDataVal(x), (str), (_size)); \
} while (0); } while (0);

View File

@ -89,8 +89,8 @@ extern int32_t tsMinRowsInFileBlock;
extern int32_t tsMaxRowsInFileBlock; extern int32_t tsMaxRowsInFileBlock;
extern int16_t tsCommitTime; // seconds extern int16_t tsCommitTime; // seconds
extern int32_t tsTimePrecision; extern int32_t tsTimePrecision;
extern int16_t tsCompression; extern int8_t tsCompression;
extern int16_t tsWAL; extern int8_t tsWAL;
extern int32_t tsFsyncPeriod; extern int32_t tsFsyncPeriod;
extern int32_t tsReplications; extern int32_t tsReplications;
extern int32_t tsQuorum; extern int32_t tsQuorum;

View File

@ -122,8 +122,8 @@ int32_t tsMinRowsInFileBlock = TSDB_DEFAULT_MIN_ROW_FBLOCK;
int32_t tsMaxRowsInFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK; int32_t tsMaxRowsInFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK;
int16_t tsCommitTime = TSDB_DEFAULT_COMMIT_TIME; // seconds int16_t tsCommitTime = TSDB_DEFAULT_COMMIT_TIME; // seconds
int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION; int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION;
int16_t tsCompression = TSDB_DEFAULT_COMP_LEVEL; int8_t tsCompression = TSDB_DEFAULT_COMP_LEVEL;
int16_t tsWAL = TSDB_DEFAULT_WAL_LEVEL; int8_t tsWAL = TSDB_DEFAULT_WAL_LEVEL;
int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION; int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION;
int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION; int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION;
@ -769,7 +769,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "comp"; cfg.option = "comp";
cfg.ptr = &tsCompression; cfg.ptr = &tsCompression;
cfg.valType = TAOS_CFG_VTYPE_INT16; cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_COMP_LEVEL; cfg.minValue = TSDB_MIN_COMP_LEVEL;
cfg.maxValue = TSDB_MAX_COMP_LEVEL; cfg.maxValue = TSDB_MAX_COMP_LEVEL;
@ -779,7 +779,7 @@ static void doInitGlobalConfig(void) {
cfg.option = "walLevel"; cfg.option = "walLevel";
cfg.ptr = &tsWAL; cfg.ptr = &tsWAL;
cfg.valType = TAOS_CFG_VTYPE_INT16; cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_WAL_LEVEL; cfg.minValue = TSDB_MIN_WAL_LEVEL;
cfg.maxValue = TSDB_MAX_WAL_LEVEL; cfg.maxValue = TSDB_MAX_WAL_LEVEL;

View File

@ -21,7 +21,7 @@
pthread_t pid; pthread_t pid;
static tsem_t cancelSem; static tsem_t cancelSem;
void shellQueryInterruptHandler(int signum) { void shellQueryInterruptHandler(int32_t signum) {
tsem_post(&cancelSem); tsem_post(&cancelSem);
} }

View File

@ -1,7 +1,7 @@
CMAKE_MINIMUM_REQUIRED(VERSION 2.8) CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine) PROJECT(TDengine)
IF (TD_LINUX) IF (TD_LINUX OR TD_WINDOWS)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)

View File

@ -81,7 +81,7 @@ static int32_t mnodeAcctActionDecode(SSdbRow *pRow) {
} }
static int32_t mnodeAcctActionRestored() { static int32_t mnodeAcctActionRestored() {
int32_t numOfRows = sdbGetNumOfRows(tsAcctSdb); int64_t numOfRows = sdbGetNumOfRows(tsAcctSdb);
if (numOfRows <= 0 && dnodeIsFirstDeploy()) { if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
mInfo("dnode first deploy, create root acct"); mInfo("dnode first deploy, create root acct");
int32_t code = mnodeCreateRootAcct(); int32_t code = mnodeCreateRootAcct();
@ -97,14 +97,14 @@ static int32_t mnodeAcctActionRestored() {
int32_t mnodeInitAccts() { int32_t mnodeInitAccts() {
SAcctObj tObj; SAcctObj tObj;
tsAcctUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; tsAcctUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj);
SSdbTableDesc desc = { SSdbTableDesc desc = {
.id = SDB_TABLE_ACCOUNT, .id = SDB_TABLE_ACCOUNT,
.name = "accounts", .name = "accounts",
.hashSessions = TSDB_DEFAULT_ACCOUNTS_HASH_SIZE, .hashSessions = TSDB_DEFAULT_ACCOUNTS_HASH_SIZE,
.maxRowSize = tsAcctUpdateSize, .maxRowSize = tsAcctUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_STRING, .keyType = SDB_KEY_STRING,
.fpInsert = mnodeAcctActionInsert, .fpInsert = mnodeAcctActionInsert,
.fpDelete = mnodeAcctActionDelete, .fpDelete = mnodeAcctActionDelete,
@ -206,7 +206,7 @@ void mnodeDropUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) {
} }
static int32_t mnodeCreateRootAcct() { static int32_t mnodeCreateRootAcct() {
int32_t numOfAccts = sdbGetNumOfRows(tsAcctSdb); int64_t numOfAccts = sdbGetNumOfRows(tsAcctSdb);
if (numOfAccts != 0) return TSDB_CODE_SUCCESS; if (numOfAccts != 0) return TSDB_CODE_SUCCESS;
SAcctObj *pAcct = malloc(sizeof(SAcctObj)); SAcctObj *pAcct = malloc(sizeof(SAcctObj));

View File

@ -68,7 +68,7 @@ static int32_t mnodeClusterActionDecode(SSdbRow *pRow) {
} }
static int32_t mnodeClusterActionRestored() { static int32_t mnodeClusterActionRestored() {
int32_t numOfRows = sdbGetNumOfRows(tsClusterSdb); int64_t numOfRows = sdbGetNumOfRows(tsClusterSdb);
if (numOfRows <= 0 && dnodeIsFirstDeploy()) { if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
mInfo("dnode first deploy, create cluster"); mInfo("dnode first deploy, create cluster");
int32_t code = mnodeCreateCluster(); int32_t code = mnodeCreateCluster();
@ -84,14 +84,14 @@ static int32_t mnodeClusterActionRestored() {
int32_t mnodeInitCluster() { int32_t mnodeInitCluster() {
SClusterObj tObj; SClusterObj tObj;
tsClusterUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; tsClusterUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj);
SSdbTableDesc desc = { SSdbTableDesc desc = {
.id = SDB_TABLE_CLUSTER, .id = SDB_TABLE_CLUSTER,
.name = "cluster", .name = "cluster",
.hashSessions = TSDB_DEFAULT_CLUSTER_HASH_SIZE, .hashSessions = TSDB_DEFAULT_CLUSTER_HASH_SIZE,
.maxRowSize = tsClusterUpdateSize, .maxRowSize = tsClusterUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_STRING, .keyType = SDB_KEY_STRING,
.fpInsert = mnodeClusterActionInsert, .fpInsert = mnodeClusterActionInsert,
.fpDelete = mnodeClusterActionDelete, .fpDelete = mnodeClusterActionDelete,
@ -139,7 +139,7 @@ void mnodeDecClusterRef(SClusterObj *pCluster) {
} }
static int32_t mnodeCreateCluster() { static int32_t mnodeCreateCluster() {
int32_t numOfClusters = sdbGetNumOfRows(tsClusterSdb); int64_t numOfClusters = sdbGetNumOfRows(tsClusterSdb);
if (numOfClusters != 0) return TSDB_CODE_SUCCESS; if (numOfClusters != 0) return TSDB_CODE_SUCCESS;
SClusterObj *pCluster = malloc(sizeof(SClusterObj)); SClusterObj *pCluster = malloc(sizeof(SClusterObj));
@ -226,7 +226,7 @@ static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows,
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *) pWrite = pCluster->createdTime; *(int64_t *) pWrite = pCluster->createdTime;
cols++; cols++;
mnodeDecClusterRef(pCluster); mnodeDecClusterRef(pCluster);

View File

@ -143,14 +143,14 @@ static int32_t mnodeDbActionRestored() {
int32_t mnodeInitDbs() { int32_t mnodeInitDbs() {
SDbObj tObj; SDbObj tObj;
tsDbUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; tsDbUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj);
SSdbTableDesc desc = { SSdbTableDesc desc = {
.id = SDB_TABLE_DB, .id = SDB_TABLE_DB,
.name = "dbs", .name = "dbs",
.hashSessions = TSDB_DEFAULT_DBS_HASH_SIZE, .hashSessions = TSDB_DEFAULT_DBS_HASH_SIZE,
.maxRowSize = tsDbUpdateSize, .maxRowSize = tsDbUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_STRING, .keyType = SDB_KEY_STRING,
.fpInsert = mnodeDbActionInsert, .fpInsert = mnodeDbActionInsert,
.fpDelete = mnodeDbActionDelete, .fpDelete = mnodeDbActionDelete,
@ -192,11 +192,11 @@ SDbObj *mnodeGetDb(char *db) {
} }
void mnodeIncDbRef(SDbObj *pDb) { void mnodeIncDbRef(SDbObj *pDb) {
return sdbIncRef(tsDbSdb, pDb); sdbIncRef(tsDbSdb, pDb);
} }
void mnodeDecDbRef(SDbObj *pDb) { void mnodeDecDbRef(SDbObj *pDb) {
return sdbDecRef(tsDbSdb, pDb); sdbDecRef(tsDbSdb, pDb);
} }
SDbObj *mnodeGetDbByTableId(char *tableId) { SDbObj *mnodeGetDbByTableId(char *tableId) {

View File

@ -148,7 +148,7 @@ static int32_t mnodeDnodeActionDecode(SSdbRow *pRow) {
} }
static int32_t mnodeDnodeActionRestored() { static int32_t mnodeDnodeActionRestored() {
int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb); int64_t numOfRows = sdbGetNumOfRows(tsDnodeSdb);
if (numOfRows <= 0 && dnodeIsFirstDeploy()) { if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
mInfo("dnode first deploy, create dnode:%s", tsLocalEp); mInfo("dnode first deploy, create dnode:%s", tsLocalEp);
mnodeCreateDnode(tsLocalEp, NULL); mnodeCreateDnode(tsLocalEp, NULL);
@ -165,7 +165,7 @@ static int32_t mnodeDnodeActionRestored() {
int32_t mnodeInitDnodes() { int32_t mnodeInitDnodes() {
SDnodeObj tObj; SDnodeObj tObj;
tsDnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; tsDnodeUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj);
pthread_mutex_init(&tsDnodeEpsMutex, NULL); pthread_mutex_init(&tsDnodeEpsMutex, NULL);
SSdbTableDesc desc = { SSdbTableDesc desc = {
@ -173,7 +173,7 @@ int32_t mnodeInitDnodes() {
.name = "dnodes", .name = "dnodes",
.hashSessions = TSDB_DEFAULT_DNODES_HASH_SIZE, .hashSessions = TSDB_DEFAULT_DNODES_HASH_SIZE,
.maxRowSize = tsDnodeUpdateSize, .maxRowSize = tsDnodeUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_AUTO, .keyType = SDB_KEY_AUTO,
.fpInsert = mnodeDnodeActionInsert, .fpInsert = mnodeDnodeActionInsert,
.fpDelete = mnodeDnodeActionDelete, .fpDelete = mnodeDnodeActionDelete,
@ -227,7 +227,7 @@ void mnodeCancelGetNextDnode(void *pIter) {
} }
int32_t mnodeGetDnodesNum() { int32_t mnodeGetDnodesNum() {
return sdbGetNumOfRows(tsDnodeSdb); return (int32_t)sdbGetNumOfRows(tsDnodeSdb);
} }
int32_t mnodeGetOnlinDnodesCpuCoreNum() { int32_t mnodeGetOnlinDnodesCpuCoreNum() {
@ -407,7 +407,7 @@ static int32_t mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) {
int64_t checkTime = 0; int64_t checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00"; char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &checkTime, strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); (void)taosParseTime(timestr, &checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
if ((0 != strncasecmp(clusterCfg->timezone, tsTimezone, strlen(tsTimezone))) && if ((0 != strncasecmp(clusterCfg->timezone, tsTimezone, strlen(tsTimezone))) &&
(checkTime != clusterCfg->checkTime)) { (checkTime != clusterCfg->checkTime)) {
mError("\"timezone\"[%s - %s] [%" PRId64 " - %" PRId64 "] cfg parameters inconsistent", clusterCfg->timezone, mError("\"timezone\"[%s - %s] [%" PRId64 " - %" PRId64 "] cfg parameters inconsistent", clusterCfg->timezone,
@ -638,9 +638,9 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
char *temp = strchr(dnodeEp, ':'); char *temp = strchr(dnodeEp, ':');
if (!temp) { if (!temp) {
int len = strlen(dnodeEp); int32_t len = (int32_t)strlen(dnodeEp);
if (dnodeEp[len - 1] == ';') dnodeEp[len - 1] = 0; if (dnodeEp[len - 1] == ';') dnodeEp[len - 1] = 0;
len = strlen(dnodeEp); len = (int32_t)strlen(dnodeEp);
snprintf(dnodeEp + len, TSDB_EP_LEN - len, ":%d", tsServerPort); snprintf(dnodeEp + len, TSDB_EP_LEN - len, ":%d", tsServerPort);
} }
ep = dnodeEp; ep = dnodeEp;

View File

@ -136,14 +136,14 @@ int32_t mnodeInitMnodes() {
mnodeMnodeInitLock(); mnodeMnodeInitLock();
SMnodeObj tObj; SMnodeObj tObj;
tsMnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; tsMnodeUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj);
SSdbTableDesc desc = { SSdbTableDesc desc = {
.id = SDB_TABLE_MNODE, .id = SDB_TABLE_MNODE,
.name = "mnodes", .name = "mnodes",
.hashSessions = TSDB_DEFAULT_MNODES_HASH_SIZE, .hashSessions = TSDB_DEFAULT_MNODES_HASH_SIZE,
.maxRowSize = tsMnodeUpdateSize, .maxRowSize = tsMnodeUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_INT, .keyType = SDB_KEY_INT,
.fpInsert = mnodeMnodeActionInsert, .fpInsert = mnodeMnodeActionInsert,
.fpDelete = mnodeMnodeActionDelete, .fpDelete = mnodeMnodeActionDelete,
@ -176,7 +176,7 @@ void mnodeCleanupMnodes() {
} }
int32_t mnodeGetMnodesNum() { int32_t mnodeGetMnodesNum() {
return sdbGetNumOfRows(tsMnodeSdb); return (int32_t)sdbGetNumOfRows(tsMnodeSdb);
} }
void *mnodeGetMnode(int32_t mnodeId) { void *mnodeGetMnode(int32_t mnodeId) {

View File

@ -207,7 +207,7 @@ static void sdbRestoreTables() {
(*pTable->fpRestored)(); (*pTable->fpRestored)();
} }
totalRows += pTable->numOfRows; totalRows += (int32_t)pTable->numOfRows;
numOfTables++; numOfTables++;
sdbInfo("vgId:1, sdb:%s is checked, rows:%" PRId64, pTable->name, pTable->numOfRows); sdbInfo("vgId:1, sdb:%s is checked, rows:%" PRId64, pTable->name, pTable->numOfRows);
} }
@ -475,7 +475,7 @@ void sdbIncRef(void *tparam, void *pRow) {
if (pRow == NULL || tparam == NULL) return; if (pRow == NULL || tparam == NULL) return;
SSdbTable *pTable = tparam; SSdbTable *pTable = tparam;
int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t * pRefCount = (int32_t *)((char *)pRow + pTable->refCountPos);
int32_t refCount = atomic_add_fetch_32(pRefCount, 1); int32_t refCount = atomic_add_fetch_32(pRefCount, 1);
sdbTrace("vgId:1, sdb:%s, inc ref to row:%p:%s:%d", pTable->name, pRow, sdbGetRowStr(pTable, pRow), refCount); sdbTrace("vgId:1, sdb:%s, inc ref to row:%p:%s:%d", pTable->name, pRow, sdbGetRowStr(pTable, pRow), refCount);
} }
@ -484,11 +484,11 @@ void sdbDecRef(void *tparam, void *pRow) {
if (pRow == NULL || tparam == NULL) return; if (pRow == NULL || tparam == NULL) return;
SSdbTable *pTable = tparam; SSdbTable *pTable = tparam;
int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t * pRefCount = (int32_t *)((char *)pRow + pTable->refCountPos);
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
sdbTrace("vgId:1, sdb:%s, dec ref to row:%p:%s:%d", pTable->name, pRow, sdbGetRowStr(pTable, pRow), refCount); sdbTrace("vgId:1, sdb:%s, dec ref to row:%p:%s:%d", pTable->name, pRow, sdbGetRowStr(pTable, pRow), refCount);
int32_t *updateEnd = pRow + pTable->refCountPos - 4; int32_t *updateEnd = (int32_t *)((char *)pRow + pTable->refCountPos - 4);
if (refCount <= 0 && *updateEnd) { if (refCount <= 0 && *updateEnd) {
sdbTrace("vgId:1, sdb:%s, row:%p:%s:%d destroyed", pTable->name, pRow, sdbGetRowStr(pTable, pRow), refCount); sdbTrace("vgId:1, sdb:%s, row:%p:%s:%d destroyed", pTable->name, pRow, sdbGetRowStr(pTable, pRow), refCount);
SSdbRow row = {.pObj = pRow}; SSdbRow row = {.pObj = pRow};
@ -501,7 +501,7 @@ static void *sdbGetRowMeta(SSdbTable *pTable, void *key) {
int32_t keySize = sizeof(int32_t); int32_t keySize = sizeof(int32_t);
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
keySize = strlen((char *)key); keySize = (int32_t)strlen((char *)key);
} }
void **ppRow = (void **)taosHashGet(pTable->iHandle, key, keySize); void **ppRow = (void **)taosHashGet(pTable->iHandle, key, keySize);
@ -534,7 +534,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow) {
int32_t keySize = sizeof(int32_t); int32_t keySize = sizeof(int32_t);
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
keySize = strlen((char *)key); keySize = (int32_t)strlen((char *)key);
} }
pthread_mutex_lock(&pTable->mutex); pthread_mutex_lock(&pTable->mutex);
@ -564,7 +564,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow) {
} }
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) { static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) {
int32_t *updateEnd = pRow->pObj + pTable->refCountPos - 4; int32_t *updateEnd = (int32_t *)((char*)pRow->pObj + pTable->refCountPos - 4);
bool set = atomic_val_compare_exchange_32(updateEnd, 0, 1) == 0; bool set = atomic_val_compare_exchange_32(updateEnd, 0, 1) == 0;
if (!set) { if (!set) {
sdbError("vgId:1, sdb:%s, failed to delete key:%s from hash, for it already removed", pTable->name, sdbError("vgId:1, sdb:%s, failed to delete key:%s from hash, for it already removed", pTable->name,
@ -577,7 +577,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) {
void * key = sdbGetObjKey(pTable, pRow->pObj); void * key = sdbGetObjKey(pTable, pRow->pObj);
int32_t keySize = sizeof(int32_t); int32_t keySize = sizeof(int32_t);
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
keySize = strlen((char *)key); keySize = (int32_t)strlen((char *)key);
} }
pthread_mutex_lock(&pTable->mutex); pthread_mutex_lock(&pTable->mutex);
@ -764,7 +764,7 @@ bool sdbCheckRowDeleted(void *tparam, void *pRow) {
SSdbTable *pTable = tparam; SSdbTable *pTable = tparam;
if (pTable == NULL) return false; if (pTable == NULL) return false;
int32_t *updateEnd = pRow + pTable->refCountPos - 4; int32_t *updateEnd = (int32_t *)((char*)pRow + pTable->refCountPos - 4);
return atomic_val_compare_exchange_32(updateEnd, 1, 1) == 1; return atomic_val_compare_exchange_32(updateEnd, 1, 1) == 1;
} }
@ -942,14 +942,14 @@ static int32_t sdbInitWorker() {
static void sdbCleanupWorker() { static void sdbCleanupWorker() {
for (int32_t i = 0; i < tsSdbPool.num; ++i) { for (int32_t i = 0; i < tsSdbPool.num; ++i) {
SSdbWorker *pWorker = tsSdbPool.worker + i; SSdbWorker *pWorker = tsSdbPool.worker + i;
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
taosQsetThreadResume(tsSdbWQset); taosQsetThreadResume(tsSdbWQset);
} }
} }
for (int32_t i = 0; i < tsSdbPool.num; ++i) { for (int32_t i = 0; i < tsSdbPool.num; ++i) {
SSdbWorker *pWorker = tsSdbPool.worker + i; SSdbWorker *pWorker = tsSdbPool.worker + i;
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
} }

View File

@ -218,7 +218,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
} }
pRsp->numOfRows = htonl(rowsRead); pRsp->numOfRows = htonl(rowsRead);
pRsp->precision = htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision pRsp->precision = (int16_t)htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision
pMsg->rpcRsp.rsp = pRsp; pMsg->rpcRsp.rsp = pRsp;
pMsg->rpcRsp.len = size; pMsg->rpcRsp.len = size;

View File

@ -222,23 +222,23 @@ static int32_t mnodeChildTableActionEncode(SSdbRow *pRow) {
SCTableObj *pTable = pRow->pObj; SCTableObj *pTable = pRow->pObj;
assert(pTable != NULL && pRow->rowData != NULL); assert(pTable != NULL && pRow->rowData != NULL);
int32_t len = strlen(pTable->info.tableId); int32_t len = (int32_t)strlen(pTable->info.tableId);
if (len >= TSDB_TABLE_FNAME_LEN) return TSDB_CODE_MND_INVALID_TABLE_ID; if (len >= TSDB_TABLE_FNAME_LEN) return TSDB_CODE_MND_INVALID_TABLE_ID;
memcpy(pRow->rowData, pTable->info.tableId, len); memcpy(pRow->rowData, pTable->info.tableId, len);
memset(pRow->rowData + len, 0, 1); memset((char *)pRow->rowData + len, 0, 1);
len++; len++;
memcpy(pRow->rowData + len, (char*)pTable + sizeof(char *), tsChildTableUpdateSize); memcpy((char *)pRow->rowData + len, (char *)pTable + sizeof(char *), tsChildTableUpdateSize);
len += tsChildTableUpdateSize; len += tsChildTableUpdateSize;
if (pTable->info.type != TSDB_CHILD_TABLE) { if (pTable->info.type != TSDB_CHILD_TABLE) {
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
memcpy(pRow->rowData + len, pTable->schema, schemaSize); memcpy((char *)pRow->rowData + len, pTable->schema, schemaSize);
len += schemaSize; len += schemaSize;
if (pTable->sqlLen != 0) { if (pTable->sqlLen != 0) {
memcpy(pRow->rowData + len, pTable->sql, pTable->sqlLen); memcpy((char *)pRow->rowData + len, pTable->sql, pTable->sqlLen);
len += pTable->sqlLen; len += pTable->sqlLen;
} }
} }
@ -253,7 +253,7 @@ static int32_t mnodeChildTableActionDecode(SSdbRow *pRow) {
SCTableObj *pTable = calloc(1, sizeof(SCTableObj)); SCTableObj *pTable = calloc(1, sizeof(SCTableObj));
if (pTable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (pTable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
int32_t len = strlen(pRow->rowData); int32_t len = (int32_t)strlen(pRow->rowData);
if (len >= TSDB_TABLE_FNAME_LEN) { if (len >= TSDB_TABLE_FNAME_LEN) {
free(pTable); free(pTable);
return TSDB_CODE_MND_INVALID_TABLE_ID; return TSDB_CODE_MND_INVALID_TABLE_ID;
@ -261,7 +261,7 @@ static int32_t mnodeChildTableActionDecode(SSdbRow *pRow) {
pTable->info.tableId = strdup(pRow->rowData); pTable->info.tableId = strdup(pRow->rowData);
len++; len++;
memcpy((char*)pTable + sizeof(char *), pRow->rowData + len, tsChildTableUpdateSize); memcpy((char *)pTable + sizeof(char *), (char *)pRow->rowData + len, tsChildTableUpdateSize);
len += tsChildTableUpdateSize; len += tsChildTableUpdateSize;
if (pTable->info.type != TSDB_CHILD_TABLE) { if (pTable->info.type != TSDB_CHILD_TABLE) {
@ -271,7 +271,7 @@ static int32_t mnodeChildTableActionDecode(SSdbRow *pRow) {
mnodeDestroyChildTable(pTable); mnodeDestroyChildTable(pTable);
return TSDB_CODE_MND_INVALID_TABLE_TYPE; return TSDB_CODE_MND_INVALID_TABLE_TYPE;
} }
memcpy(pTable->schema, pRow->rowData + len, schemaSize); memcpy(pTable->schema, (char *)pRow->rowData + len, schemaSize);
len += schemaSize; len += schemaSize;
if (pTable->sqlLen != 0) { if (pTable->sqlLen != 0) {
@ -280,7 +280,7 @@ static int32_t mnodeChildTableActionDecode(SSdbRow *pRow) {
mnodeDestroyChildTable(pTable); mnodeDestroyChildTable(pTable);
return TSDB_CODE_MND_OUT_OF_MEMORY; return TSDB_CODE_MND_OUT_OF_MEMORY;
} }
memcpy(pTable->sql, pRow->rowData + len, pTable->sqlLen); memcpy(pTable->sql, (char *)pRow->rowData + len, pTable->sqlLen);
} }
} }
@ -352,14 +352,14 @@ static int32_t mnodeChildTableActionRestored() {
static int32_t mnodeInitChildTables() { static int32_t mnodeInitChildTables() {
SCTableObj tObj; SCTableObj tObj;
tsChildTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; tsChildTableUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type);
SSdbTableDesc desc = { SSdbTableDesc desc = {
.id = SDB_TABLE_CTABLE, .id = SDB_TABLE_CTABLE,
.name = "ctables", .name = "ctables",
.hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE, .hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE,
.maxRowSize = sizeof(SCTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN + TSDB_CQ_SQL_SIZE, .maxRowSize = sizeof(SCTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN + TSDB_CQ_SQL_SIZE,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_VAR_STRING, .keyType = SDB_KEY_VAR_STRING,
.fpInsert = mnodeChildTableActionInsert, .fpInsert = mnodeChildTableActionInsert,
.fpDelete = mnodeChildTableActionDelete, .fpDelete = mnodeChildTableActionDelete,
@ -501,18 +501,18 @@ static int32_t mnodeSuperTableActionEncode(SSdbRow *pRow) {
SSTableObj *pStable = pRow->pObj; SSTableObj *pStable = pRow->pObj;
assert(pRow->pObj != NULL && pRow->rowData != NULL); assert(pRow->pObj != NULL && pRow->rowData != NULL);
int32_t len = strlen(pStable->info.tableId); int32_t len = (int32_t)strlen(pStable->info.tableId);
if (len >= TSDB_TABLE_FNAME_LEN) len = TSDB_CODE_MND_INVALID_TABLE_ID; if (len >= TSDB_TABLE_FNAME_LEN) len = TSDB_CODE_MND_INVALID_TABLE_ID;
memcpy(pRow->rowData, pStable->info.tableId, len); memcpy(pRow->rowData, pStable->info.tableId, len);
memset(pRow->rowData + len, 0, 1); memset((char *)pRow->rowData + len, 0, 1);
len++; len++;
memcpy(pRow->rowData + len, (char*)pStable + sizeof(char *), tsSuperTableUpdateSize); memcpy((char *)pRow->rowData + len, (char *)pStable + sizeof(char *), tsSuperTableUpdateSize);
len += tsSuperTableUpdateSize; len += tsSuperTableUpdateSize;
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags);
memcpy(pRow->rowData + len, pStable->schema, schemaSize); memcpy((char *)pRow->rowData + len, pStable->schema, schemaSize);
len += schemaSize; len += schemaSize;
pRow->rowSize = len; pRow->rowSize = len;
@ -525,7 +525,7 @@ static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) {
SSTableObj *pStable = (SSTableObj *) calloc(1, sizeof(SSTableObj)); SSTableObj *pStable = (SSTableObj *) calloc(1, sizeof(SSTableObj));
if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY; if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
int32_t len = strlen(pRow->rowData); int32_t len = (int32_t)strlen(pRow->rowData);
if (len >= TSDB_TABLE_FNAME_LEN){ if (len >= TSDB_TABLE_FNAME_LEN){
free(pStable); free(pStable);
return TSDB_CODE_MND_INVALID_TABLE_ID; return TSDB_CODE_MND_INVALID_TABLE_ID;
@ -533,7 +533,7 @@ static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) {
pStable->info.tableId = strdup(pRow->rowData); pStable->info.tableId = strdup(pRow->rowData);
len++; len++;
memcpy((char*)pStable + sizeof(char *), pRow->rowData + len, tsSuperTableUpdateSize); memcpy((char *)pStable + sizeof(char *), (char *)pRow->rowData + len, tsSuperTableUpdateSize);
len += tsSuperTableUpdateSize; len += tsSuperTableUpdateSize;
int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags);
@ -543,7 +543,7 @@ static int32_t mnodeSuperTableActionDecode(SSdbRow *pRow) {
return TSDB_CODE_MND_NOT_SUPER_TABLE; return TSDB_CODE_MND_NOT_SUPER_TABLE;
} }
memcpy(pStable->schema, pRow->rowData + len, schemaSize); memcpy(pStable->schema, (char *)pRow->rowData + len, schemaSize);
pRow->pObj = pStable; pRow->pObj = pStable;
@ -556,14 +556,14 @@ static int32_t mnodeSuperTableActionRestored() {
static int32_t mnodeInitSuperTables() { static int32_t mnodeInitSuperTables() {
SSTableObj tObj; SSTableObj tObj;
tsSuperTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; tsSuperTableUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type);
SSdbTableDesc desc = { SSdbTableDesc desc = {
.id = SDB_TABLE_STABLE, .id = SDB_TABLE_STABLE,
.name = "stables", .name = "stables",
.hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE, .hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE,
.maxRowSize = sizeof(SSTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN, .maxRowSize = sizeof(SSTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_VAR_STRING, .keyType = SDB_KEY_VAR_STRING,
.fpInsert = mnodeSuperTableActionInsert, .fpInsert = mnodeSuperTableActionInsert,
.fpDelete = mnodeSuperTableActionDelete, .fpDelete = mnodeSuperTableActionDelete,
@ -1266,7 +1266,7 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c
} }
// int32_t rowSize = 0; // int32_t rowSize = 0;
uint32_t len = strlen(newTagName); uint32_t len = (int32_t)strlen(newTagName);
if (len >= TSDB_COL_NAME_LEN) { if (len >= TSDB_COL_NAME_LEN) {
return TSDB_CODE_MND_COL_NAME_TOO_LONG; return TSDB_CODE_MND_COL_NAME_TOO_LONG;
} }
@ -1429,7 +1429,7 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char
} }
// int32_t rowSize = 0; // int32_t rowSize = 0;
uint32_t len = strlen(newName); uint32_t len = (uint32_t)strlen(newName);
if (len >= TSDB_COL_NAME_LEN) { if (len >= TSDB_COL_NAME_LEN) {
return TSDB_CODE_MND_COL_NAME_TOO_LONG; return TSDB_CODE_MND_COL_NAME_TOO_LONG;
} }
@ -1534,7 +1534,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
tstrncpy(prefix, pDb->name, 64); tstrncpy(prefix, pDb->name, 64);
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
prefixLen = strlen(prefix); prefixLen = (int32_t)strlen(prefix);
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char stableName[TSDB_TABLE_NAME_LEN] = {0}; char stableName[TSDB_TABLE_NAME_LEN] = {0};
@ -1559,7 +1559,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
int16_t len = strnlen(stableName, TSDB_TABLE_NAME_LEN - 1); int16_t len = (int16_t)strnlen(stableName, TSDB_TABLE_NAME_LEN - 1);
*(int16_t*) pWrite = len; *(int16_t*) pWrite = len;
pWrite += sizeof(int16_t); // todo refactor pWrite += sizeof(int16_t); // todo refactor
@ -1602,7 +1602,7 @@ void mnodeDropAllSuperTables(SDbObj *pDropDb) {
char prefix[64] = {0}; char prefix[64] = {0};
tstrncpy(prefix, pDropDb->name, 64); tstrncpy(prefix, pDropDb->name, 64);
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = strlen(prefix); int32_t prefixLen = (int32_t)strlen(prefix);
mInfo("db:%s, all super tables will be dropped from sdb", pDropDb->name); mInfo("db:%s, all super tables will be dropped from sdb", pDropDb->name);
@ -1755,9 +1755,9 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
rpcFreeCont(pRsp); rpcFreeCont(pRsp);
return TSDB_CODE_MND_INVALID_TABLE_NAME; return TSDB_CODE_MND_INVALID_TABLE_NAME;
} else { } else {
pRsp->numOfTables = htonl(pRsp->numOfTables); pRsp->numOfTables = (int32_t)htonl(pRsp->numOfTables);
pMsg->rpcRsp.rsp = pRsp; pMsg->rpcRsp.rsp = pRsp;
pMsg->rpcRsp.len = msg - (char *)pRsp; pMsg->rpcRsp.len = (int32_t)((char *)msg - (char *)pRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2030,7 +2030,7 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
//SCMCreateTableMsg* p1 = pMsg->rpcMsg.pCont; // there are several tables here. //SCMCreateTableMsg* p1 = pMsg->rpcMsg.pCont; // there are several tables here.
SCreateTableMsg* pCreate = (SCreateTableMsg*)(pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg)); SCreateTableMsg* pCreate = (SCreateTableMsg*)((char *)pMsg->rpcMsg.pCont + sizeof(SCMCreateTableMsg));
int32_t code = grantCheck(TSDB_GRANT_TIMESERIES); int32_t code = grantCheck(TSDB_GRANT_TIMESERIES);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -2287,7 +2287,7 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char
} }
// int32_t rowSize = 0; // int32_t rowSize = 0;
uint32_t len = strlen(newName); uint32_t len = (uint32_t)strlen(newName);
if (len >= TSDB_COL_NAME_LEN) { if (len >= TSDB_COL_NAME_LEN) {
return TSDB_CODE_MND_COL_NAME_TOO_LONG; return TSDB_CODE_MND_COL_NAME_TOO_LONG;
} }
@ -2491,7 +2491,7 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) {
char prefix[64] = {0}; char prefix[64] = {0};
tstrncpy(prefix, pDropDb->name, 64); tstrncpy(prefix, pDropDb->name, 64);
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = strlen(prefix); int32_t prefixLen = (int32_t)strlen(prefix);
mInfo("db:%s, all child tables will be dropped from sdb", pDropDb->name); mInfo("db:%s, all child tables will be dropped from sdb", pDropDb->name);
@ -2907,7 +2907,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char prefix[64] = {0}; char prefix[64] = {0};
int32_t prefixLen = tableIdPrefix(pDb->name, prefix, 64); int32_t prefixLen = (int32_t)tableIdPrefix(pDb->name, prefix, 64);
char* pattern = NULL; char* pattern = NULL;
if (pShow->payloadLen > 0) { if (pShow->payloadLen > 0) {
@ -3143,7 +3143,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
char prefix[64] = {0}; char prefix[64] = {0};
tstrncpy(prefix, pDb->name, 64); tstrncpy(prefix, pDb->name, 64);
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = strlen(prefix); int32_t prefixLen = (int32_t)strlen(prefix);
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = mnodeGetNextChildTable(pShow->pIter, &pTable); pShow->pIter = mnodeGetNextChildTable(pShow->pIter, &pTable);

View File

@ -128,7 +128,7 @@ static void mnodePrintUserAuth() {
} }
static int32_t mnodeUserActionRestored() { static int32_t mnodeUserActionRestored() {
int32_t numOfRows = sdbGetNumOfRows(tsUserSdb); int64_t numOfRows = sdbGetNumOfRows(tsUserSdb);
if (numOfRows <= 0 && dnodeIsFirstDeploy()) { if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
mInfo("dnode first deploy, create root user"); mInfo("dnode first deploy, create root user");
SAcctObj *pAcct = mnodeGetAcct(TSDB_DEFAULT_USER); SAcctObj *pAcct = mnodeGetAcct(TSDB_DEFAULT_USER);
@ -148,14 +148,14 @@ static int32_t mnodeUserActionRestored() {
int32_t mnodeInitUsers() { int32_t mnodeInitUsers() {
SUserObj tObj; SUserObj tObj;
tsUserUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; tsUserUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj);
SSdbTableDesc desc = { SSdbTableDesc desc = {
.id = SDB_TABLE_USER, .id = SDB_TABLE_USER,
.name = "users", .name = "users",
.hashSessions = TSDB_DEFAULT_USERS_HASH_SIZE, .hashSessions = TSDB_DEFAULT_USERS_HASH_SIZE,
.maxRowSize = tsUserUpdateSize, .maxRowSize = tsUserUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_STRING, .keyType = SDB_KEY_STRING,
.fpInsert = mnodeUserActionInsert, .fpInsert = mnodeUserActionInsert,
.fpDelete = mnodeUserActionDelete, .fpDelete = mnodeUserActionDelete,
@ -204,11 +204,11 @@ void mnodeCancelGetNextUser(void *pIter) {
} }
void mnodeIncUserRef(SUserObj *pUser) { void mnodeIncUserRef(SUserObj *pUser) {
return sdbIncRef(tsUserSdb, pUser); sdbIncRef(tsUserSdb, pUser);
} }
void mnodeDecUserRef(SUserObj *pUser) { void mnodeDecUserRef(SUserObj *pUser) {
return sdbDecRef(tsUserSdb, pUser); sdbDecRef(tsUserSdb, pUser);
} }
static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) { static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) {
@ -561,7 +561,7 @@ static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg) {
void mnodeDropAllUsers(SAcctObj *pAcct) { void mnodeDropAllUsers(SAcctObj *pAcct) {
void * pIter = NULL; void * pIter = NULL;
int32_t numOfUsers = 0; int32_t numOfUsers = 0;
int32_t acctNameLen = strlen(pAcct->user); int32_t acctNameLen = (int32_t)strlen(pAcct->user);
SUserObj *pUser = NULL; SUserObj *pUser = NULL;
while (1) { while (1) {

View File

@ -206,14 +206,14 @@ static int32_t mnodeVgroupActionRestored() {
int32_t mnodeInitVgroups() { int32_t mnodeInitVgroups() {
SVgObj tObj; SVgObj tObj;
tsVgUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; tsVgUpdateSize = (int32_t)((int8_t *)tObj.updateEnd - (int8_t *)&tObj);
SSdbTableDesc desc = { SSdbTableDesc desc = {
.id = SDB_TABLE_VGROUP, .id = SDB_TABLE_VGROUP,
.name = "vgroups", .name = "vgroups",
.hashSessions = TSDB_DEFAULT_VGROUPS_HASH_SIZE, .hashSessions = TSDB_DEFAULT_VGROUPS_HASH_SIZE,
.maxRowSize = tsVgUpdateSize, .maxRowSize = tsVgUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, .refCountPos = (int32_t)((int8_t *)(&tObj.refCount) - (int8_t *)&tObj),
.keyType = SDB_KEY_AUTO, .keyType = SDB_KEY_AUTO,
.fpInsert = mnodeVgroupActionInsert, .fpInsert = mnodeVgroupActionInsert,
.fpDelete = mnodeVgroupActionDelete, .fpDelete = mnodeVgroupActionDelete,
@ -245,11 +245,11 @@ int32_t mnodeInitVgroups() {
} }
void mnodeIncVgroupRef(SVgObj *pVgroup) { void mnodeIncVgroupRef(SVgObj *pVgroup) {
return sdbIncRef(tsVgroupSdb, pVgroup); sdbIncRef(tsVgroupSdb, pVgroup);
} }
void mnodeDecVgroupRef(SVgObj *pVgroup) { void mnodeDecVgroupRef(SVgObj *pVgroup) {
return sdbDecRef(tsVgroupSdb, pVgroup); sdbDecRef(tsVgroupSdb, pVgroup);
} }
SVgObj *mnodeGetVgroup(int32_t vgId) { SVgObj *mnodeGetVgroup(int32_t vgId) {

View File

@ -37,7 +37,7 @@ int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstP
} }
// TAOS_OS_FUNC_FILE_SENDIFLE // TAOS_OS_FUNC_FILE_SENDIFLE
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size); int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t *offset, int64_t size);
int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size); int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size);
#ifdef TAOS_RANDOM_FILE_FAIL #ifdef TAOS_RANDOM_FILE_FAIL

View File

@ -61,6 +61,7 @@ extern "C" {
int32_t taosSetNonblocking(SOCKET sock, int32_t on); int32_t taosSetNonblocking(SOCKET sock, int32_t on);
void taosIgnSIGPIPE(); void taosIgnSIGPIPE();
void taosBlockSIGPIPE(); void taosBlockSIGPIPE();
void taosSetMaskSIGPIPE();
// TAOS_OS_FUNC_SOCKET_SETSOCKETOPT // TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen); int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen);

View File

@ -121,7 +121,7 @@ int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) {
#ifndef TAOS_OS_FUNC_FILE_SENDIFLE #ifndef TAOS_OS_FUNC_FILE_SENDIFLE
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size) { int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t *offset, int64_t size) {
int64_t leftbytes = size; int64_t leftbytes = size;
int64_t sentbytes; int64_t sentbytes;

View File

@ -53,6 +53,16 @@ void taosBlockSIGPIPE() {
} }
} }
void taosSetMaskSIGPIPE() {
sigset_t signal_mask;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGPIPE);
int32_t rc = pthread_sigmask(SIG_SETMASK, &signal_mask, NULL);
if (rc != 0) {
uError("failed to setmask SIGPIPE");
}
}
#endif #endif
#ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT #ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT

View File

@ -78,7 +78,7 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co
return writeLen; return writeLen;
} }
int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t* offset, int64_t size) { int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t* offset, int64_t size) {
uError("taosSendFile no implemented yet"); uError("taosSendFile no implemented yet");
return 0; return 0;
} }

View File

@ -48,6 +48,7 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
void taosIgnSIGPIPE() {} void taosIgnSIGPIPE() {}
void taosBlockSIGPIPE() {} void taosBlockSIGPIPE() {}
void taosSetMaskSIGPIPE() {}
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
if (level == SOL_SOCKET && optname == TCP_KEEPCNT) { if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {

View File

@ -9,7 +9,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX) IF (TD_LINUX OR TD_WINDOWS)
ADD_LIBRARY(http ${SRC}) ADD_LIBRARY(http ${SRC})
TARGET_LINK_LIBRARIES(http z) TARGET_LINK_LIBRARIES(http z)

View File

@ -166,7 +166,7 @@ typedef struct HttpThread {
HttpContext * pHead; HttpContext * pHead;
pthread_mutex_t threadMutex; pthread_mutex_t threadMutex;
bool stop; bool stop;
int32_t pollFd; SOCKET pollFd;
int32_t numOfContexts; int32_t numOfContexts;
int32_t threadId; int32_t threadId;
char label[HTTP_LABEL_SIZE]; char label[HTTP_LABEL_SIZE];
@ -177,7 +177,7 @@ typedef struct HttpServer {
char label[HTTP_LABEL_SIZE]; char label[HTTP_LABEL_SIZE];
uint32_t serverIp; uint32_t serverIp;
uint16_t serverPort; uint16_t serverPort;
int32_t fd; SOCKET fd;
int32_t numOfThreads; int32_t numOfThreads;
int32_t methodScannerLen; int32_t methodScannerLen;
int32_t requestNum; int32_t requestNum;

View File

@ -158,7 +158,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
if (row[i]!= NULL){ if (row[i]!= NULL){
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:", fields[i].name); len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:", fields[i].name);
memcpy(target + len, (char *) row[i], length[i]); memcpy(target + len, (char *) row[i], length[i]);
len = strlen(target); len = (int32_t)strlen(target);
} }
break; break;
default: default:

View File

@ -133,7 +133,7 @@ int32_t ehttp_gzip_write(ehttp_gzip_t *gzip, const char *buf, int32_t len) {
if (ret!=Z_STREAM_END) continue; if (ret!=Z_STREAM_END) continue;
} }
int32_t len = gzip->gzip->next_out - (z_const Bytef*)gzip->chunk; int32_t len = (int32_t)(gzip->gzip->next_out - (z_const Bytef*)gzip->chunk);
gzip->gzip->next_out[0] = '\0'; gzip->gzip->next_out[0] = '\0';
gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len); gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len);
@ -155,7 +155,7 @@ int32_t ehttp_gzip_finish(ehttp_gzip_t *gzip) {
if (ret != Z_STREAM_END) return -1; if (ret != Z_STREAM_END) return -1;
int32_t len = gzip->gzip->next_out - (z_const Bytef*)gzip->chunk; int32_t len = (int32_t)(gzip->gzip->next_out - (z_const Bytef*)gzip->chunk);
gzip->gzip->next_out[0] = '\0'; gzip->gzip->next_out[0] = '\0';
gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len); gzip->callbacks.on_data(gzip, gzip->arg, gzip->chunk, len);

View File

@ -93,7 +93,7 @@ int32_t httpWriteBufNoTrace(struct HttpContext *pContext, const char *buf, int32
int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) { int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
int32_t remain = 0; int32_t remain = 0;
char sLen[24]; char sLen[24];
uint64_t srcLen = (uint64_t) (buf->lst - buf->buf); int32_t srcLen = (int32_t) (buf->lst - buf->buf);
if (buf->pContext->fd <= 0) { if (buf->pContext->fd <= 0) {
httpTrace("context:%p, fd:%d, write json body error", buf->pContext, buf->pContext->fd); httpTrace("context:%p, fd:%d, write json body error", buf->pContext, buf->pContext->fd);
@ -113,11 +113,11 @@ int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
httpTrace("context:%p, fd:%d, no data need dump", buf->pContext, buf->pContext->fd); httpTrace("context:%p, fd:%d, no data need dump", buf->pContext, buf->pContext->fd);
return 0; // there is no data to dump. return 0; // there is no data to dump.
} else { } else {
int32_t len = sprintf(sLen, "%" PRIx64 "\r\n", srcLen); int32_t len = sprintf(sLen, "%d\r\n", srcLen);
httpTrace("context:%p, fd:%d, write body, chunkSize:%" PRIu64 ", response:\n%s", buf->pContext, buf->pContext->fd, httpTrace("context:%p, fd:%d, write body, chunkSize:%d, response:\n%s", buf->pContext, buf->pContext->fd,
srcLen, buf->buf); srcLen, buf->buf);
httpWriteBufNoTrace(buf->pContext, sLen, len); httpWriteBufNoTrace(buf->pContext, sLen, len);
remain = httpWriteBufNoTrace(buf->pContext, buf->buf, (int32_t)srcLen); remain = httpWriteBufNoTrace(buf->pContext, buf->buf, srcLen);
} }
} else { } else {
char compressBuf[JSON_BUFFER_SIZE] = {0}; char compressBuf[JSON_BUFFER_SIZE] = {0};
@ -126,7 +126,7 @@ int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
if (ret == 0) { if (ret == 0) {
if (compressBufLen > 0) { if (compressBufLen > 0) {
int32_t len = sprintf(sLen, "%x\r\n", compressBufLen); int32_t len = sprintf(sLen, "%x\r\n", compressBufLen);
httpTrace("context:%p, fd:%d, write body, chunkSize:%" PRIu64 ", compressSize:%d, last:%d, response:\n%s", httpTrace("context:%p, fd:%d, write body, chunkSize:%d, compressSize:%d, last:%d, response:\n%s",
buf->pContext, buf->pContext->fd, srcLen, compressBufLen, isTheLast, buf->buf); buf->pContext, buf->pContext->fd, srcLen, compressBufLen, isTheLast, buf->buf);
httpWriteBufNoTrace(buf->pContext, sLen, len); httpWriteBufNoTrace(buf->pContext, sLen, len);
remain = httpWriteBufNoTrace(buf->pContext, (const char*)compressBuf, compressBufLen); remain = httpWriteBufNoTrace(buf->pContext, (const char*)compressBuf, compressBufLen);
@ -136,7 +136,7 @@ int32_t httpWriteJsonBufBody(JsonBuf* buf, bool isTheLast) {
remain = 0; // there is no data to dump. remain = 0; // there is no data to dump.
} }
} else { } else {
httpError("context:%p, fd:%d, failed to compress data, chunkSize:%" PRIu64 ", last:%d, error:%d, response:\n%s", httpError("context:%p, fd:%d, failed to compress data, chunkSize:%d, last:%d, error:%d, response:\n%s",
buf->pContext, buf->pContext->fd, srcLen, isTheLast, ret, buf->buf); buf->pContext, buf->pContext->fd, srcLen, isTheLast, ret, buf->buf);
remain = 0; remain = 0;
} }

View File

@ -153,7 +153,7 @@ static int32_t httpOnRequestLine(HttpParser *pParser, char *method, char *target
for (int32_t i = 0; i < HTTP_MAX_URL; i++) { for (int32_t i = 0; i < HTTP_MAX_URL; i++) {
char *pSeek = strchr(pStart, '/'); char *pSeek = strchr(pStart, '/');
if (pSeek == NULL) { if (pSeek == NULL) {
(void)httpAppendString(pParser->path + i, pStart, strlen(pStart)); (void)httpAppendString(pParser->path + i, pStart, (int32_t)strlen(pStart));
break; break;
} else { } else {
(void)httpAppendString(pParser->path + i, pStart, (int32_t)(pSeek - pStart)); (void)httpAppendString(pParser->path + i, pStart, (int32_t)(pSeek - pStart));
@ -285,7 +285,7 @@ static int32_t httpOnParseHeaderField(HttpParser *parser, const char *key, const
free(t); free(t);
free(s); free(s);
httpTrace("context:%p, fd:%d, basic auth:%s", pContext, pContext->fd, parser->authContent); httpTrace("context:%p, fd:%d, basic auth:%s", pContext, pContext->fd, parser->authContent);
int32_t ok = httpParseBasicAuthToken(pContext, parser->authContent, strlen(parser->authContent)); int32_t ok = httpParseBasicAuthToken(pContext, parser->authContent, (int32_t)strlen(parser->authContent));
if (ok != 0) { if (ok != 0) {
httpOnError(parser, 0, TSDB_CODE_HTTP_INVALID_BASIC_AUTH); httpOnError(parser, 0, TSDB_CODE_HTTP_INVALID_BASIC_AUTH);
return -1; return -1;
@ -299,7 +299,7 @@ static int32_t httpOnParseHeaderField(HttpParser *parser, const char *key, const
free(t); free(t);
free(s); free(s);
httpTrace("context:%p, fd:%d, taosd auth:%s", pContext, pContext->fd, parser->authContent); httpTrace("context:%p, fd:%d, taosd auth:%s", pContext, pContext->fd, parser->authContent);
int32_t ok = httpParseTaosdAuthToken(pContext, parser->authContent, strlen(parser->authContent)); int32_t ok = httpParseTaosdAuthToken(pContext, parser->authContent, (int32_t)strlen(parser->authContent));
if (ok != 0) { if (ok != 0) {
httpOnError(parser, 0, TSDB_CODE_HTTP_INVALID_TAOSD_AUTH); httpOnError(parser, 0, TSDB_CODE_HTTP_INVALID_TAOSD_AUTH);
return -1; return -1;
@ -524,14 +524,14 @@ char *httpDecodeUrl(const char *enc) {
int32_t hex, cnt; int32_t hex, cnt;
int32_t n = sscanf(p+1, "%2x%n", &hex, &cnt); int32_t n = sscanf(p+1, "%2x%n", &hex, &cnt);
if (n!=1 && cnt !=2) { ok = 0; break; } if (n!=1 && cnt !=2) { ok = 0; break; }
if (httpAppendString(&str, enc, p-enc)) { ok = 0; break; } if (httpAppendString(&str, enc, (int32_t)(p-enc))) { ok = 0; break; }
char c = (char)hex; char c = (char)hex;
if (httpAppendString(&str, &c, 1)) { ok = 0; break; } if (httpAppendString(&str, &c, 1)) { ok = 0; break; }
enc = p+3; enc = p+3;
} }
char *dec = NULL; char *dec = NULL;
if (ok && *enc) { if (ok && *enc) {
if (httpAppendString(&str, enc, strlen(enc))) { ok = 0; } if (httpAppendString(&str, enc, (int32_t)strlen(enc))) { ok = 0; }
} }
if (ok) { if (ok) {
dec = str.str; dec = str.str;
@ -667,7 +667,7 @@ static int32_t httpParserOnVersion(HttpParser *parser, HTTP_PARSER_STATE state,
int32_t ok = 0; int32_t ok = 0;
do { do {
const char *prefix = "HTTP/1."; const char *prefix = "HTTP/1.";
int32_t len = strlen(prefix); int32_t len = (int32_t)strlen(prefix);
if (parser->str.pos < len) { if (parser->str.pos < len) {
if (prefix[parser->str.pos] != c) { if (prefix[parser->str.pos] != c) {
httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c); httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c);
@ -811,7 +811,7 @@ static int32_t httpParserOnCrlf(HttpParser *parser, HTTP_PARSER_STATE state, con
int32_t ok = 0; int32_t ok = 0;
do { do {
const char *s = "\r\n"; const char *s = "\r\n";
int32_t len = strlen(s); int32_t len = (int32_t)strlen(s);
if (s[parser->str.pos] != c) { if (s[parser->str.pos] != c) {
httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c); httpError("context:%p, fd:%d, parser state:%d, unexpected char:[%c]%02x", pContext, pContext->fd, state, c, c);
ok = -1; ok = -1;

View File

@ -134,14 +134,14 @@ void httpCleanupResultQueue() {
for (int32_t i = 0; i < tsHttpPool.num; ++i) { for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i; SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
taosQsetThreadResume(tsHttpQset); taosQsetThreadResume(tsHttpQset);
} }
} }
for (int32_t i = 0; i < tsHttpPool.num; ++i) { for (int32_t i = 0; i < tsHttpPool.num; ++i) {
SHttpWorker *pWorker = tsHttpPool.httpWorker + i; SHttpWorker *pWorker = tsHttpPool.httpWorker + i;
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
} }

View File

@ -25,10 +25,6 @@
#include "httpResp.h" #include "httpResp.h"
#include "httpUtil.h" #include "httpUtil.h"
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
#endif
static bool httpReadData(HttpContext *pContext); static bool httpReadData(HttpContext *pContext);
static void httpStopThread(HttpThread* pThread) { static void httpStopThread(HttpThread* pThread) {
@ -49,10 +45,10 @@ static void httpStopThread(HttpThread* pThread) {
pthread_join(pThread->thread, NULL); pthread_join(pThread->thread, NULL);
if (fd != -1) { if (fd != -1) {
close(fd); taosCloseSocket(fd);
} }
close(pThread->pollFd); taosCloseSocket(pThread->pollFd);
pthread_mutex_destroy(&(pThread->threadMutex)); pthread_mutex_destroy(&(pThread->threadMutex));
} }
@ -77,10 +73,7 @@ static void httpProcessHttpData(void *param) {
HttpContext *pContext; HttpContext *pContext;
int32_t fdNum; int32_t fdNum;
sigset_t set; taosSetMaskSIGPIPE();
sigemptyset(&set);
sigaddset(&set, SIGPIPE);
pthread_sigmask(SIG_SETMASK, &set, NULL);
while (1) { while (1) {
struct epoll_event events[HTTP_MAX_EVENTS]; struct epoll_event events[HTTP_MAX_EVENTS];
@ -162,10 +155,7 @@ static void *httpAcceptHttpConnection(void *arg) {
HttpContext * pContext = NULL; HttpContext * pContext = NULL;
int32_t totalFds = 0; int32_t totalFds = 0;
sigset_t set; taosSetMaskSIGPIPE();
sigemptyset(&set);
sigaddset(&set, SIGPIPE);
pthread_sigmask(SIG_SETMASK, &set, NULL);
pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort); pServer->fd = taosOpenTcpServerSocket(pServer->serverIp, pServer->serverPort);
@ -242,7 +232,7 @@ static void *httpAcceptHttpConnection(void *arg) {
threadId = threadId % pServer->numOfThreads; threadId = threadId % pServer->numOfThreads;
} }
close(pServer->fd); taosCloseSocket(pServer->fd);
return NULL; return NULL;
} }
@ -265,7 +255,7 @@ bool httpInitConnect() {
return false; return false;
} }
pThread->pollFd = epoll_create(HTTP_MAX_EVENTS); // size does not matter pThread->pollFd = (SOCKET)epoll_create(HTTP_MAX_EVENTS); // size does not matter
if (pThread->pollFd < 0) { if (pThread->pollFd < 0) {
httpError("http thread:%s, failed to create HTTP epoll", pThread->label); httpError("http thread:%s, failed to create HTTP epoll", pThread->label);
pthread_mutex_destroy(&(pThread->threadMutex)); pthread_mutex_destroy(&(pThread->threadMutex));

View File

@ -276,7 +276,7 @@ int32_t tgReadSchema(char *fileName) {
rewind(fp); rewind(fp);
char * content = (char *)calloc(contentSize + 1, 1); char * content = (char *)calloc(contentSize + 1, 1);
int32_t result = fread(content, 1, contentSize, fp); int32_t result = (int32_t)fread(content, 1, contentSize, fp);
if (result != contentSize) { if (result != contentSize) {
httpError("failed to read telegraf schema file:%s", fileName); httpError("failed to read telegraf schema file:%s", fileName);

View File

@ -388,7 +388,7 @@ int32_t httpGzipDeCompress(char *srcData, int32_t nSrcData, char *destData, int3
if (inflateEnd(&gzipStream) != Z_OK) { if (inflateEnd(&gzipStream) != Z_OK) {
return -4; return -4;
} }
*nDestData = gzipStream.total_out; *nDestData = (int32_t)gzipStream.total_out;
return 0; return 0;
} }
@ -417,7 +417,7 @@ int32_t httpGzipCompress(HttpContext *pContext, char *srcData, int32_t nSrcData,
return -1; return -1;
} }
int32_t cacheLen = pContext->gzipStream.total_out - lastTotalLen; int32_t cacheLen = (int32_t)(pContext->gzipStream.total_out - lastTotalLen);
if (cacheLen >= *nDestData) { if (cacheLen >= *nDestData) {
return -2; return -2;
} }

View File

@ -6,7 +6,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
AUX_SOURCE_DIRECTORY(./src SRC) AUX_SOURCE_DIRECTORY(./src SRC)
IF (TD_LINUX) IF (TD_LINUX OR TD_WINDOWS)
ADD_LIBRARY(monitor ${SRC}) ADD_LIBRARY(monitor ${SRC})
IF (TD_SOMODE_STATIC) IF (TD_SOMODE_STATIC)

View File

@ -79,8 +79,8 @@ int32_t monInitSystem() {
strcpy(tsMonitor.ep, tsLocalEp); strcpy(tsMonitor.ep, tsLocalEp);
} }
int len = strlen(tsMonitor.ep); int32_t len = (int32_t)strlen(tsMonitor.ep);
for (int i = 0; i < len; ++i) { for (int32_t i = 0; i < len; ++i) {
if (tsMonitor.ep[i] == ':' || tsMonitor.ep[i] == '-' || tsMonitor.ep[i] == '.') { if (tsMonitor.ep[i] == ':' || tsMonitor.ep[i] == '-' || tsMonitor.ep[i] == '.') {
tsMonitor.ep[i] = '_'; tsMonitor.ep[i] = '_';
} }
@ -148,7 +148,7 @@ static void *monThreadFunc(void *param) {
} }
if (tsMonitor.state == MON_STATE_NOT_INIT) { if (tsMonitor.state == MON_STATE_NOT_INIT) {
int code = 0; int32_t code = 0;
for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) { for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) {
monBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex); monBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex);
@ -330,7 +330,7 @@ static void monSaveSystemInfo() {
pos += monBuildReqSql(sql + pos); pos += monBuildReqSql(sql + pos);
void *res = taos_query(tsMonitor.conn, tsMonitor.sql); void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
int code = taos_errno(res); int32_t code = taos_errno(res);
taos_free_result(res); taos_free_result(res);
if (code != 0) { if (code != 0) {

View File

@ -132,7 +132,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
break; break;
} }
pThreadObj->pollFd = (int64_t)epoll_create(10); // size does not matter pThreadObj->pollFd = (SOCKET)epoll_create(10); // size does not matter
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label); tError("%s failed to create TCP epoll", label);
code = -1; code = -1;

View File

@ -4,7 +4,7 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX) IF (TD_LINUX OR TD_WINDOWS)
LIST(REMOVE_ITEM SRC src/syncArbitrator.c) LIST(REMOVE_ITEM SRC src/syncArbitrator.c)
ADD_LIBRARY(sync ${SRC}) ADD_LIBRARY(sync ${SRC})
TARGET_LINK_LIBRARIES(sync tutil pthread common) TARGET_LINK_LIBRARIES(sync tutil pthread common)

View File

@ -82,8 +82,8 @@ typedef struct SsyncPeer {
uint64_t sversion; // track the peer version in retrieve process uint64_t sversion; // track the peer version in retrieve process
uint64_t lastFileVer; // track the file version while retrieve uint64_t lastFileVer; // track the file version while retrieve
uint64_t lastWalVer; // track the wal version while retrieve uint64_t lastWalVer; // track the wal version while retrieve
int32_t syncFd; SOCKET syncFd;
int32_t peerFd; // forward FD SOCKET peerFd; // forward FD
int32_t numOfRetrieves; // number of retrieves tried int32_t numOfRetrieves; // number of retrieves tried
int32_t fileChanged; // a flag to indicate file is changed during retrieving process int32_t fileChanged; // a flag to indicate file is changed during retrieving process
int32_t refCount; int32_t refCount;

View File

@ -27,12 +27,12 @@ typedef struct {
int32_t bufferSize; int32_t bufferSize;
void (*processBrokenLink)(int64_t handleId); void (*processBrokenLink)(int64_t handleId);
int32_t (*processIncomingMsg)(int64_t handleId, void *buffer); int32_t (*processIncomingMsg)(int64_t handleId, void *buffer);
void (*processIncomingConn)(int32_t fd, uint32_t ip); void (*processIncomingConn)(SOCKET fd, uint32_t ip);
} SPoolInfo; } SPoolInfo;
void *syncOpenTcpThreadPool(SPoolInfo *pInfo); void *syncOpenTcpThreadPool(SPoolInfo *pInfo);
void syncCloseTcpThreadPool(void *); void syncCloseTcpThreadPool(void *);
void *syncAllocateTcpConn(void *, int64_t rid, int32_t connFd); void *syncAllocateTcpConn(void *, int64_t rid, SOCKET connFd);
void syncFreeTcpConn(void *); void syncFreeTcpConn(void *);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -27,8 +27,12 @@
#include "syncInt.h" #include "syncInt.h"
#include "syncTcp.h" #include "syncTcp.h"
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); #ifndef SIGHUP
static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); #define SIGHUP SIGTERM
#endif
static void arbSignalHandler(int32_t signum);
static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp);
static void arbProcessBrokenLink(int64_t rid); static void arbProcessBrokenLink(int64_t rid);
static int32_t arbProcessPeerMsg(int64_t rid, void *buffer); static int32_t arbProcessPeerMsg(int64_t rid, void *buffer);
static tsem_t tsArbSem; static tsem_t tsArbSem;
@ -36,7 +40,7 @@ static void * tsArbTcpPool;
typedef struct { typedef struct {
char id[TSDB_EP_LEN + 24]; char id[TSDB_EP_LEN + 24];
int32_t nodeFd; SOCKET nodeFd;
void * pConn; void * pConn;
} SNodeConn; } SNodeConn;
@ -70,8 +74,9 @@ int32_t main(int32_t argc, char *argv[]) {
/* Set termination handler. */ /* Set termination handler. */
struct sigaction act = {{0}}; struct sigaction act = {{0}};
act.sa_flags = SA_SIGINFO; memset(&act, 0, sizeof(struct sigaction));
act.sa_sigaction = arbSignalHandler;
act.sa_handler = arbSignalHandler;
sigaction(SIGTERM, &act, NULL); sigaction(SIGTERM, &act, NULL);
sigaction(SIGHUP, &act, NULL); sigaction(SIGHUP, &act, NULL);
sigaction(SIGINT, &act, NULL); sigaction(SIGINT, &act, NULL);
@ -103,12 +108,11 @@ int32_t main(int32_t argc, char *argv[]) {
syncCloseTcpThreadPool(tsArbTcpPool); syncCloseTcpThreadPool(tsArbTcpPool);
sInfo("TAOS arbitrator is shut down"); sInfo("TAOS arbitrator is shut down");
closelog();
return 0; return 0;
} }
static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
char ipstr[24]; char ipstr[24];
tinet_ntoa(ipstr, sourceIp); tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr); sDebug("peer TCP connection from ip:%s", ipstr);
@ -172,15 +176,18 @@ static int32_t arbProcessPeerMsg(int64_t rid, void *buffer) {
return 0; return 0;
} }
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) { static void arbSignalHandler(int32_t signum) {
struct sigaction act = {{0}}; struct sigaction act = {{0}};
act.sa_handler = SIG_IGN; act.sa_handler = SIG_IGN;
sigaction(SIGTERM, &act, NULL); sigaction(SIGTERM, &act, NULL);
sigaction(SIGHUP, &act, NULL); sigaction(SIGHUP, &act, NULL);
sigaction(SIGINT, &act, NULL); sigaction(SIGINT, &act, NULL);
#ifndef WINDOWS
sInfo("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid); sInfo("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid);
#else
sInfo("shut down signal is %d", signum);
#endif
// inform main thread to exit // inform main thread to exit
tsem_post(&tsArbSem); tsem_post(&tsArbSem);
} }

View File

@ -45,7 +45,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId);
static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId); static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId);
static void syncProcessBrokenLink(int64_t rid); static void syncProcessBrokenLink(int64_t rid);
static int32_t syncProcessPeerMsg(int64_t rid, void *buffer); static int32_t syncProcessPeerMsg(int64_t rid, void *buffer);
static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp);
static void syncRemovePeer(SSyncPeer *pPeer); static void syncRemovePeer(SSyncPeer *pPeer);
static void syncAddArbitrator(SSyncNode *pNode); static void syncAddArbitrator(SSyncNode *pNode);
static void syncFreeNode(void *); static void syncFreeNode(void *);
@ -544,7 +544,7 @@ static void syncClosePeerConn(SSyncPeer *pPeer) {
sDebug("%s, pfd:%d sfd:%d will be closed", pPeer->id, pPeer->peerFd, pPeer->syncFd); sDebug("%s, pfd:%d sfd:%d will be closed", pPeer->id, pPeer->peerFd, pPeer->syncFd);
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
taosClose(pPeer->syncFd); taosCloseSocket(pPeer->syncFd);
if (pPeer->peerFd >= 0) { if (pPeer->peerFd >= 0) {
pPeer->peerFd = -1; pPeer->peerFd = -1;
void *pConn = pPeer->pConn; void *pConn = pPeer->pConn;
@ -869,7 +869,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
if (nodeRole != TAOS_SYNC_ROLE_MASTER) { if (nodeRole != TAOS_SYNC_ROLE_MASTER) {
sError("%s, I am not master anymore", pPeer->id); sError("%s, I am not master anymore", pPeer->id);
taosClose(pPeer->syncFd); taosCloseSocket(pPeer->syncFd);
return; return;
} }
@ -1114,7 +1114,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
return; return;
} }
int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); SOCKET connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (connFd < 0) { if (connFd < 0) {
sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno)); sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno));
taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer);
@ -1132,7 +1132,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
if (pPeer->isArb) tsArbOnline = 1; if (pPeer->isArb) tsArbOnline = 1;
} else { } else {
sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno)); sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno));
taosClose(connFd); taosCloseSocket(connFd);
taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer);
} }
} }
@ -1171,7 +1171,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
nodeSStatus = TAOS_SYNC_STATUS_INIT; nodeSStatus = TAOS_SYNC_STATUS_INIT;
sError("%s, failed to create sync restore thread, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); sError("%s, failed to create sync restore thread, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
taosClose(pPeer->syncFd); taosCloseSocket(pPeer->syncFd);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
} else { } else {
sInfo("%s, sync restore thread:0x%08" PRIx64 " create successfully, rid:%" PRId64, pPeer->id, sInfo("%s, sync restore thread:0x%08" PRIx64 " create successfully, rid:%" PRId64, pPeer->id,
@ -1179,7 +1179,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
} }
} }
static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
char ipstr[24]; char ipstr[24];
int32_t i; int32_t i;

View File

@ -365,7 +365,7 @@ void *syncRestoreData(void *param) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
__sync_fetch_and_add(&tsSyncNum, 1); atomic_add_fetch_32(&tsSyncNum, 1);
sInfo("%s, start to restore data, sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); sInfo("%s, start to restore data, sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
(*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING); (*pNode->notifyRole)(pNode->vgId, TAOS_SYNC_ROLE_SYNCING);
@ -390,9 +390,9 @@ void *syncRestoreData(void *param) {
nodeSStatus = TAOS_SYNC_STATUS_INIT; nodeSStatus = TAOS_SYNC_STATUS_INIT;
sInfo("%s, restore data over, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); sInfo("%s, restore data over, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
taosClose(pPeer->syncFd); taosCloseSocket(pPeer->syncFd);
syncCloseRecvBuffer(pNode); syncCloseRecvBuffer(pNode);
__sync_fetch_and_sub(&tsSyncNum, 1); atomic_sub_fetch_32(&tsSyncNum, 1);
// The ref is obtained in both the create thread and the current thread, so it is released twice // The ref is obtained in both the create thread and the current thread, so it is released twice
syncReleasePeer(pPeer); syncReleasePeer(pPeer);

View File

@ -14,7 +14,6 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include <sys/inotify.h>
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
@ -160,7 +159,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
break; break;
} }
ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); ret = (int32_t)taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
close(sfd); close(sfd);
if (ret < 0) { if (ret < 0) {
code = -1; code = -1;
@ -228,7 +227,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
return -1; return -1;
} }
int32_t code = taosLSeek(sfd, offset, SEEK_SET); int32_t code = (int32_t)taosLSeek(sfd, offset, SEEK_SET);
if (code < 0) { if (code < 0) {
sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno)); sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno));
close(sfd); close(sfd);
@ -322,7 +321,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
// if all data are read out, and no update // if all data are read out, and no update
if (bytes == 0 && !walModified) { if (bytes == 0 && !walModified) {
// wal not closed, it means some data not flushed to disk, wait for a while // wal not closed, it means some data not flushed to disk, wait for a while
usleep(10000); taosMsleep(10);
} }
// if bytes > 0, file is updated, or fversion is not reached but file still open, read again // if bytes > 0, file is updated, or fversion is not reached but file still open, read again
@ -384,7 +383,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
break; break;
} }
code = taosSendFile(pPeer->syncFd, sfd, NULL, size); code = (int32_t)taosSendFile(pPeer->syncFd, sfd, NULL, size);
close(sfd); close(sfd);
if (code < 0) { if (code < 0) {
sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code); sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
@ -501,7 +500,7 @@ void *syncRetrieveData(void *param) {
} }
pPeer->fileChanged = 0; pPeer->fileChanged = 0;
taosClose(pPeer->syncFd); taosCloseSocket(pPeer->syncFd);
// The ref is obtained in both the create thread and the current thread, so it is released twice // The ref is obtained in both the create thread and the current thread, so it is released twice
sInfo("%s, sync retrieve data over, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); sInfo("%s, sync retrieve data over, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);

View File

@ -24,10 +24,18 @@
#include "syncInt.h" #include "syncInt.h"
#include "syncTcp.h" #include "syncTcp.h"
#ifdef WINDOWS
#include "wepoll.h"
#endif
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
#endif
typedef struct SThreadObj { typedef struct SThreadObj {
pthread_t thread; pthread_t thread;
bool stop; bool stop;
int32_t pollFd; SOCKET pollFd;
int32_t numOfFds; int32_t numOfFds;
struct SPoolObj *pPool; struct SPoolObj *pPool;
} SThreadObj; } SThreadObj;
@ -37,13 +45,13 @@ typedef struct SPoolObj {
SThreadObj **pThread; SThreadObj **pThread;
pthread_t thread; pthread_t thread;
int32_t nextId; int32_t nextId;
int32_t acceptFd; // FD for accept new connection SOCKET acceptFd; // FD for accept new connection
} SPoolObj; } SPoolObj;
typedef struct { typedef struct {
SThreadObj *pThread; SThreadObj *pThread;
int64_t handleId; int64_t handleId;
int32_t fd; SOCKET fd;
int32_t closedByApp; int32_t closedByApp;
} SConnObj; } SConnObj;
@ -82,7 +90,7 @@ void *syncOpenTcpThreadPool(SPoolInfo *pInfo) {
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pPool->thread), &thattr, (void *)syncAcceptPeerTcpConnection, pPool) != 0) { if (pthread_create(&(pPool->thread), &thattr, (void *)syncAcceptPeerTcpConnection, pPool) != 0) {
sError("failed to create accept thread for TCP server since %s", strerror(errno)); sError("failed to create accept thread for TCP server since %s", strerror(errno));
close(pPool->acceptFd); taosCloseSocket(pPool->acceptFd);
tfree(pPool->pThread); tfree(pPool->pThread);
tfree(pPool); tfree(pPool);
return NULL; return NULL;
@ -112,7 +120,7 @@ void syncCloseTcpThreadPool(void *param) {
tfree(pPool); tfree(pPool);
} }
void *syncAllocateTcpConn(void *param, int64_t rid, int32_t connFd) { void *syncAllocateTcpConn(void *param, int64_t rid, SOCKET connFd) {
struct epoll_event event; struct epoll_event event;
SPoolObj *pPool = param; SPoolObj *pPool = param;
@ -169,7 +177,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
pThread->numOfFds--; pThread->numOfFds--;
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL); epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL);
sDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds); sDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds);
taosClose(pConn->fd); taosCloseSocket(pConn->fd);
tfree(pConn); tfree(pConn);
} }
@ -233,7 +241,7 @@ static void *syncProcessTcpData(void *param) {
sDebug("%p TCP epoll thread exits", pThread); sDebug("%p TCP epoll thread exits", pThread);
close(pThread->pollFd); taosCloseSocket(pThread->pollFd);
tfree(pThread); tfree(pThread);
tfree(buffer); tfree(buffer);
return NULL; return NULL;
@ -248,7 +256,7 @@ static void *syncAcceptPeerTcpConnection(void *argv) {
while (1) { while (1) {
struct sockaddr_in clientAddr; struct sockaddr_in clientAddr;
socklen_t addrlen = sizeof(clientAddr); socklen_t addrlen = sizeof(clientAddr);
int32_t connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen); SOCKET connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen);
if (connFd < 0) { if (connFd < 0) {
if (errno == EINVAL) { if (errno == EINVAL) {
sDebug("%p TCP server accept is exiting...", pPool); sDebug("%p TCP server accept is exiting...", pPool);
@ -264,7 +272,7 @@ static void *syncAcceptPeerTcpConnection(void *argv) {
(*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr); (*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr);
} }
taosClose(pPool->acceptFd); taosCloseSocket(pPool->acceptFd);
return NULL; return NULL;
} }
@ -277,7 +285,7 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
if (pThread == NULL) return NULL; if (pThread == NULL) return NULL;
pThread->pPool = pPool; pThread->pPool = pPool;
pThread->pollFd = epoll_create(10); // size does not matter pThread->pollFd = (SOCKET)epoll_create(10); // size does not matter
if (pThread->pollFd < 0) { if (pThread->pollFd < 0) {
tfree(pThread); tfree(pThread);
return NULL; return NULL;
@ -290,7 +298,7 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (ret != 0) { if (ret != 0) {
close(pThread->pollFd); taosCloseSocket(pThread->pollFd);
tfree(pThread); tfree(pThread);
return NULL; return NULL;
} }

View File

@ -20,6 +20,14 @@
extern "C" { extern "C" {
#endif #endif
#ifdef WINDOWS
#include "wepoll.h"
#endif
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
#endif
int32_t taosReadn(SOCKET sock, char *buffer, int32_t len); int32_t taosReadn(SOCKET sock, char *buffer, int32_t len);
int32_t taosWriteMsg(SOCKET fd, void *ptr, int32_t nbytes); int32_t taosWriteMsg(SOCKET fd, void *ptr, int32_t nbytes);
int32_t taosReadMsg(SOCKET fd, void *ptr, int32_t nbytes); int32_t taosReadMsg(SOCKET fd, void *ptr, int32_t nbytes);

View File

@ -78,7 +78,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
len = fread(content, 1, maxLen, fp); len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) { if (len <= 0) {
vError("vgId:%d, failed to read %s, content is null", pVnode->vgId, file); vError("vgId:%d, failed to read %s, content is null", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
@ -103,14 +103,14 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
vError("vgId:%d, failed to read %s, cfgVersion not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, cfgVersion not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
vnodeMsg.cfg.dbCfgVersion = dbCfgVersion->valueint; vnodeMsg.cfg.dbCfgVersion = (int32_t)dbCfgVersion->valueint;
cJSON *vgCfgVersion = cJSON_GetObjectItem(root, "vgCfgVersion"); cJSON *vgCfgVersion = cJSON_GetObjectItem(root, "vgCfgVersion");
if (!vgCfgVersion || vgCfgVersion->type != cJSON_Number) { if (!vgCfgVersion || vgCfgVersion->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, vgCfgVersion not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, vgCfgVersion not found", pVnode->vgId, file);
vnodeMsg.cfg.vgCfgVersion = 0; vnodeMsg.cfg.vgCfgVersion = 0;
} else { } else {
vnodeMsg.cfg.vgCfgVersion = vgCfgVersion->valueint; vnodeMsg.cfg.vgCfgVersion = (int32_t)vgCfgVersion->valueint;
} }
cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize"); cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
@ -118,56 +118,56 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
vError("vgId:%d, failed to read %s, cacheBlockSize not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, cacheBlockSize not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
vnodeMsg.cfg.cacheBlockSize = cacheBlockSize->valueint; vnodeMsg.cfg.cacheBlockSize = (int32_t)cacheBlockSize->valueint;
cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks"); cJSON *totalBlocks = cJSON_GetObjectItem(root, "totalBlocks");
if (!totalBlocks || totalBlocks->type != cJSON_Number) { if (!totalBlocks || totalBlocks->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, totalBlocks not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, totalBlocks not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
vnodeMsg.cfg.totalBlocks = totalBlocks->valueint; vnodeMsg.cfg.totalBlocks = (int32_t)totalBlocks->valueint;
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile"); cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
if (!daysPerFile || daysPerFile->type != cJSON_Number) { if (!daysPerFile || daysPerFile->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysPerFile not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, daysPerFile not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
vnodeMsg.cfg.daysPerFile = daysPerFile->valueint; vnodeMsg.cfg.daysPerFile = (int32_t)daysPerFile->valueint;
cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep"); cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep");
if (!daysToKeep || daysToKeep->type != cJSON_Number) { if (!daysToKeep || daysToKeep->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysToKeep not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, daysToKeep not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
vnodeMsg.cfg.daysToKeep = daysToKeep->valueint; vnodeMsg.cfg.daysToKeep = (int32_t)daysToKeep->valueint;
cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1"); cJSON *daysToKeep1 = cJSON_GetObjectItem(root, "daysToKeep1");
if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) { if (!daysToKeep1 || daysToKeep1->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysToKeep1 not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, daysToKeep1 not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
vnodeMsg.cfg.daysToKeep1 = daysToKeep1->valueint; vnodeMsg.cfg.daysToKeep1 = (int32_t)daysToKeep1->valueint;
cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2"); cJSON *daysToKeep2 = cJSON_GetObjectItem(root, "daysToKeep2");
if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) { if (!daysToKeep2 || daysToKeep2->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, daysToKeep2 not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, daysToKeep2 not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
vnodeMsg.cfg.daysToKeep2 = daysToKeep2->valueint; vnodeMsg.cfg.daysToKeep2 = (int32_t)daysToKeep2->valueint;
cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock"); cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock");
if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) { if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, minRowsPerFileBlock not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, minRowsPerFileBlock not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
vnodeMsg.cfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint; vnodeMsg.cfg.minRowsPerFileBlock = (int32_t)minRowsPerFileBlock->valueint;
cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock"); cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock");
if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) { if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, maxRowsPerFileBlock not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, maxRowsPerFileBlock not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
vnodeMsg.cfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint; vnodeMsg.cfg.maxRowsPerFileBlock = (int32_t)maxRowsPerFileBlock->valueint;
cJSON *precision = cJSON_GetObjectItem(root, "precision"); cJSON *precision = cJSON_GetObjectItem(root, "precision");
if (!precision || precision->type != cJSON_Number) { if (!precision || precision->type != cJSON_Number) {
@ -195,7 +195,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
vError("vgId:%d, failed to read %s, fsyncPeriod not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, fsyncPeriod not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
vnodeMsg.cfg.fsyncPeriod = fsyncPeriod->valueint; vnodeMsg.cfg.fsyncPeriod = (int32_t)fsyncPeriod->valueint;
cJSON *wals = cJSON_GetObjectItem(root, "wals"); cJSON *wals = cJSON_GetObjectItem(root, "wals");
if (!wals || wals->type != cJSON_Number) { if (!wals || wals->type != cJSON_Number) {
@ -258,7 +258,7 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
vError("vgId:%d, failed to read %s, nodeId not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, nodeId not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
node->nodeId = nodeId->valueint; node->nodeId = (int32_t)nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {

View File

@ -4,7 +4,7 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
IF (TD_LINUX) IF (TD_LINUX OR TD_WINDOWS)
ADD_LIBRARY(twal ${SRC}) ADD_LIBRARY(twal ${SRC})
TARGET_LINK_LIBRARIES(twal tutil common) TARGET_LINK_LIBRARIES(twal tutil common)
ADD_SUBDIRECTORY(test) ADD_SUBDIRECTORY(test)

View File

@ -210,7 +210,7 @@ static int32_t walCreateThread() {
static void walStopThread() { static void walStopThread() {
tsWal.stop = 1; tsWal.stop = 1;
if (tsWal.thread) { if (taosCheckPthreadValid(tsWal.thread)) {
pthread_join(tsWal.thread, NULL); pthread_join(tsWal.thread, NULL);
} }

View File

@ -272,7 +272,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
SWalHead *pHead = buffer; SWalHead *pHead = buffer;
while (1) { while (1) {
int32_t ret = tfRead(tfd, pHead, sizeof(SWalHead)); int32_t ret = (int32_t)tfRead(tfd, pHead, sizeof(SWalHead));
if (ret == 0) break; if (ret == 0) break;
if (ret < 0) { if (ret < 0) {
@ -307,7 +307,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
} }
} }
ret = tfRead(tfd, pHead->cont, pHead->len); ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len);
if (ret < 0) { if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno)); wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);