fix coverity scan problem
This commit is contained in:
parent
4223b43a3a
commit
c9aadfadee
|
@ -1059,7 +1059,7 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
|
|
||||||
if (param->val == NULL) {
|
if (param->val == NULL) {
|
||||||
metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
|
metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
|
||||||
return -1;
|
goto END;
|
||||||
} else {
|
} else {
|
||||||
if (IS_VAR_DATA_TYPE(param->type)) {
|
if (IS_VAR_DATA_TYPE(param->type)) {
|
||||||
tagData = varDataVal(param->val);
|
tagData = varDataVal(param->val);
|
||||||
|
@ -1111,27 +1111,25 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (p->suid != pKey->suid) {
|
if (p == NULL || p->suid != pKey->suid) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
first = false;
|
first = false;
|
||||||
if (p != NULL) {
|
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
|
||||||
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
|
if (cmp == 0) {
|
||||||
if (cmp == 0) {
|
// match
|
||||||
// match
|
tb_uid_t tuid = 0;
|
||||||
tb_uid_t tuid = 0;
|
if (IS_VAR_DATA_TYPE(pKey->type)) {
|
||||||
if (IS_VAR_DATA_TYPE(pKey->type)) {
|
tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
|
||||||
tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
|
|
||||||
} else {
|
|
||||||
tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
|
|
||||||
}
|
|
||||||
taosArrayPush(pUids, &tuid);
|
|
||||||
} else if (cmp == 1) {
|
|
||||||
// not match but should continue to iter
|
|
||||||
} else {
|
} else {
|
||||||
// not match and no more result
|
tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
taosArrayPush(pUids, &tuid);
|
||||||
|
} else if (cmp == 1) {
|
||||||
|
// not match but should continue to iter
|
||||||
|
} else {
|
||||||
|
// not match and no more result
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
|
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
|
||||||
if (valid < 0) {
|
if (valid < 0) {
|
||||||
|
|
|
@ -119,6 +119,8 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
|
||||||
taosArrayDestroy(pTagVals);
|
taosArrayDestroy(pTagVals);
|
||||||
indexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
|
indexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
|
||||||
indexMultiTermDestroy(terms);
|
indexMultiTermDestroy(terms);
|
||||||
|
|
||||||
|
taosArrayDestroy(pTagVals);
|
||||||
#endif
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -159,6 +161,7 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche
|
||||||
memcpy(val, (uint16_t *)&len, VARSTR_HEADER_SIZE);
|
memcpy(val, (uint16_t *)&len, VARSTR_HEADER_SIZE);
|
||||||
type = TSDB_DATA_TYPE_VARCHAR;
|
type = TSDB_DATA_TYPE_VARCHAR;
|
||||||
term = indexTermCreate(suid, DEL_VALUE, type, key, nKey, val, len);
|
term = indexTermCreate(suid, DEL_VALUE, type, key, nKey, val, len);
|
||||||
|
taosMemoryFree(val);
|
||||||
} else if (pTagVal->nData == 0) {
|
} else if (pTagVal->nData == 0) {
|
||||||
term = indexTermCreate(suid, DEL_VALUE, TSDB_DATA_TYPE_VARCHAR, key, nKey, pTagVal->pData, 0);
|
term = indexTermCreate(suid, DEL_VALUE, TSDB_DATA_TYPE_VARCHAR, key, nKey, pTagVal->pData, 0);
|
||||||
}
|
}
|
||||||
|
@ -177,6 +180,7 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche
|
||||||
}
|
}
|
||||||
indexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
|
indexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
|
||||||
indexMultiTermDestroy(terms);
|
indexMultiTermDestroy(terms);
|
||||||
|
taosArrayDestroy(pTagVals);
|
||||||
#endif
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,7 +138,7 @@ void idxReleaseRef(int64_t ref);
|
||||||
#define IDX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \
|
#define IDX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \
|
||||||
do { \
|
do { \
|
||||||
uint8_t oldTy = ty; \
|
uint8_t oldTy = ty; \
|
||||||
ty = (ty >> 4) | exTy; \
|
ty = ((ty >> 4) & 0xFF) | exTy; \
|
||||||
ty = (ty << 4) | oldTy; \
|
ty = (ty << 4) | oldTy; \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
|
|
@ -139,7 +139,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
||||||
|
|
||||||
END:
|
END:
|
||||||
if (idx != NULL) {
|
if (idx != NULL) {
|
||||||
indexClose(idx);
|
indexDestroy(idx);
|
||||||
}
|
}
|
||||||
*index = NULL;
|
*index = NULL;
|
||||||
return ret;
|
return ret;
|
||||||
|
|
|
@ -538,7 +538,7 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||||
idxCacheRef(pCache);
|
idxCacheRef(pCache);
|
||||||
// encode data
|
// encode data
|
||||||
CacheTerm* ct = taosMemoryCalloc(1, sizeof(CacheTerm));
|
CacheTerm* ct = taosMemoryCalloc(1, sizeof(CacheTerm));
|
||||||
if (cache == NULL) {
|
if (ct == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// set up key
|
// set up key
|
||||||
|
@ -730,15 +730,17 @@ static int32_t idxCacheJsonTermCompare(const void* l, const void* r) {
|
||||||
return cmp;
|
return cmp;
|
||||||
}
|
}
|
||||||
static MemTable* idxInternalCacheCreate(int8_t type) {
|
static MemTable* idxInternalCacheCreate(int8_t type) {
|
||||||
int ttype = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : TSDB_DATA_TYPE_BINARY;
|
// int ttype = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY :
|
||||||
|
// TSDB_DATA_TYPE_BINARY;
|
||||||
|
int ttype = TSDB_DATA_TYPE_BINARY;
|
||||||
int32_t (*cmpFn)(const void* l, const void* r) =
|
int32_t (*cmpFn)(const void* l, const void* r) =
|
||||||
IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? idxCacheJsonTermCompare : idxCacheTermCompare;
|
IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? idxCacheJsonTermCompare : idxCacheTermCompare;
|
||||||
|
|
||||||
MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
|
MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
|
||||||
idxMemRef(tbl);
|
idxMemRef(tbl);
|
||||||
if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR) {
|
// if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR) {
|
||||||
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, idxCacheTermGet);
|
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, idxCacheTermGet);
|
||||||
}
|
//}
|
||||||
return tbl;
|
return tbl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -427,6 +427,7 @@ int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) {
|
||||||
*dst = taosMemoryCalloc(1, bufSize + 1);
|
*dst = taosMemoryCalloc(1, bufSize + 1);
|
||||||
idxInt2str(*(uint64_t*)src, *dst, 1);
|
idxInt2str(*(uint64_t*)src, *dst, 1);
|
||||||
tlen = strlen(*dst);
|
tlen = strlen(*dst);
|
||||||
|
break;
|
||||||
case TSDB_DATA_TYPE_FLOAT:
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
*dst = taosMemoryCalloc(1, bufSize + 1);
|
*dst = taosMemoryCalloc(1, bufSize + 1);
|
||||||
sprintf(*dst, "%.9lf", *(float*)src);
|
sprintf(*dst, "%.9lf", *(float*)src);
|
||||||
|
|
|
@ -231,7 +231,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
||||||
SIF_ERR_RET(sifGetValueFromNode(node, ¶m->condValue));
|
SIF_ERR_RET(sifGetValueFromNode(node, ¶m->condValue));
|
||||||
param->colId = -1;
|
param->colId = -1;
|
||||||
param->colValType = (uint8_t)(vn->node.resType.type);
|
param->colValType = (uint8_t)(vn->node.resType.type);
|
||||||
memcpy(param->colName, vn->literal, strlen(vn->literal));
|
memcpy(param->colName, vn->literal, sizeof(param->colName));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_COLUMN: {
|
case QUERY_NODE_COLUMN: {
|
||||||
|
@ -400,54 +400,52 @@ static FORCE_INLINE FilterFunc sifGetFilterFunc(EIndexQueryType type, bool *reve
|
||||||
static void sifSetFltParam(SIFParam *left, SIFParam *right, SDataTypeBuf *typedata, SMetaFltParam *param) {
|
static void sifSetFltParam(SIFParam *left, SIFParam *right, SDataTypeBuf *typedata, SMetaFltParam *param) {
|
||||||
int8_t ltype = left->colValType, rtype = right->colValType;
|
int8_t ltype = left->colValType, rtype = right->colValType;
|
||||||
if (ltype == TSDB_DATA_TYPE_FLOAT) {
|
if (ltype == TSDB_DATA_TYPE_FLOAT) {
|
||||||
float f;
|
float f = 0;
|
||||||
SIF_DATA_CONVERT(rtype, right->condValue, f);
|
SIF_DATA_CONVERT(rtype, right->condValue, f);
|
||||||
typedata->f = f;
|
typedata->f = f;
|
||||||
param->val = &typedata->f;
|
param->val = &typedata->f;
|
||||||
} else if (ltype == TSDB_DATA_TYPE_DOUBLE) {
|
} else if (ltype == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
double d;
|
double d = 0;
|
||||||
SIF_DATA_CONVERT(rtype, right->condValue, d);
|
SIF_DATA_CONVERT(rtype, right->condValue, d);
|
||||||
typedata->d = d;
|
typedata->d = d;
|
||||||
param->val = &typedata->d;
|
param->val = &typedata->d;
|
||||||
} else if (ltype == TSDB_DATA_TYPE_BIGINT) {
|
} else if (ltype == TSDB_DATA_TYPE_BIGINT) {
|
||||||
int64_t i64;
|
int64_t i64 = 0;
|
||||||
SIF_DATA_CONVERT(rtype, right->condValue, i64);
|
SIF_DATA_CONVERT(rtype, right->condValue, i64);
|
||||||
typedata->i64 = i64;
|
typedata->i64 = i64;
|
||||||
param->val = &typedata->i64;
|
param->val = &typedata->i64;
|
||||||
} else if (ltype == TSDB_DATA_TYPE_INT) {
|
} else if (ltype == TSDB_DATA_TYPE_INT) {
|
||||||
int32_t i32;
|
int32_t i32 = 0;
|
||||||
SIF_DATA_CONVERT(rtype, right->condValue, i32);
|
SIF_DATA_CONVERT(rtype, right->condValue, i32);
|
||||||
typedata->i32 = i32;
|
typedata->i32 = i32;
|
||||||
param->val = &typedata->i32;
|
param->val = &typedata->i32;
|
||||||
} else if (ltype == TSDB_DATA_TYPE_SMALLINT) {
|
} else if (ltype == TSDB_DATA_TYPE_SMALLINT) {
|
||||||
int16_t i16;
|
int16_t i16 = 0;
|
||||||
|
|
||||||
SIF_DATA_CONVERT(rtype, right->condValue, i16);
|
SIF_DATA_CONVERT(rtype, right->condValue, i16);
|
||||||
typedata->i16 = i16;
|
typedata->i16 = i16;
|
||||||
param->val = &typedata->i16;
|
param->val = &typedata->i16;
|
||||||
} else if (ltype == TSDB_DATA_TYPE_TINYINT) {
|
} else if (ltype == TSDB_DATA_TYPE_TINYINT) {
|
||||||
int8_t i8;
|
int8_t i8 = 0;
|
||||||
SIF_DATA_CONVERT(rtype, right->condValue, i8)
|
SIF_DATA_CONVERT(rtype, right->condValue, i8)
|
||||||
typedata->i8 = i8;
|
typedata->i8 = i8;
|
||||||
param->val = &typedata->i8;
|
param->val = &typedata->i8;
|
||||||
} else if (ltype == TSDB_DATA_TYPE_UBIGINT) {
|
} else if (ltype == TSDB_DATA_TYPE_UBIGINT) {
|
||||||
uint64_t u64;
|
uint64_t u64 = 0;
|
||||||
SIF_DATA_CONVERT(rtype, right->condValue, u64);
|
SIF_DATA_CONVERT(rtype, right->condValue, u64);
|
||||||
typedata->u64 = u64;
|
typedata->u64 = u64;
|
||||||
param->val = &typedata->u64;
|
param->val = &typedata->u64;
|
||||||
|
|
||||||
} else if (ltype == TSDB_DATA_TYPE_UINT) {
|
} else if (ltype == TSDB_DATA_TYPE_UINT) {
|
||||||
uint32_t u32;
|
uint32_t u32 = 0;
|
||||||
SIF_DATA_CONVERT(rtype, right->condValue, u32);
|
SIF_DATA_CONVERT(rtype, right->condValue, u32);
|
||||||
typedata->u32 = u32;
|
typedata->u32 = u32;
|
||||||
param->val = &typedata->u32;
|
param->val = &typedata->u32;
|
||||||
} else if (ltype == TSDB_DATA_TYPE_USMALLINT) {
|
} else if (ltype == TSDB_DATA_TYPE_USMALLINT) {
|
||||||
uint16_t u16;
|
uint16_t u16 = 0;
|
||||||
SIF_DATA_CONVERT(rtype, right->condValue, u16);
|
SIF_DATA_CONVERT(rtype, right->condValue, u16);
|
||||||
typedata->u16 = u16;
|
typedata->u16 = u16;
|
||||||
param->val = &typedata->u16;
|
param->val = &typedata->u16;
|
||||||
} else if (ltype == TSDB_DATA_TYPE_UTINYINT) {
|
} else if (ltype == TSDB_DATA_TYPE_UTINYINT) {
|
||||||
uint8_t u8;
|
uint8_t u8 = 0;
|
||||||
SIF_DATA_CONVERT(rtype, right->condValue, u8);
|
SIF_DATA_CONVERT(rtype, right->condValue, u8);
|
||||||
typedata->u8 = u8;
|
typedata->u8 = u8;
|
||||||
param->val = &typedata->u8;
|
param->val = &typedata->u8;
|
||||||
|
@ -663,7 +661,7 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
||||||
// ugly code, refactor later
|
// ugly code, refactor later
|
||||||
if (nParam > 1 && params[1].status == SFLT_NOT_INDEX) {
|
if (nParam > 1 && params[1].status == SFLT_NOT_INDEX) {
|
||||||
output->status = SFLT_NOT_INDEX;
|
output->status = SFLT_NOT_INDEX;
|
||||||
return code;
|
goto _return;
|
||||||
}
|
}
|
||||||
SIF_ERR_JRET(sifGetOperFn(node->opType, &operFn, &output->status));
|
SIF_ERR_JRET(sifGetOperFn(node->opType, &operFn, &output->status));
|
||||||
}
|
}
|
||||||
|
|
|
@ -338,7 +338,7 @@ uint8_t fstStateCommInput(FstState* s, bool* null) {
|
||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
// 0 indicate that common_input is None
|
// 0 indicate that common_input is None
|
||||||
return v == 0 ? 0 : COMMON_INPUT(v);
|
return COMMON_INPUT(v);
|
||||||
}
|
}
|
||||||
|
|
||||||
// input_len
|
// input_len
|
||||||
|
|
|
@ -72,7 +72,8 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
|
||||||
if (offset >= ctx->file.size) return 0;
|
if (offset >= ctx->file.size) return 0;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
char key[128] = {0};
|
char key[1024] = {0};
|
||||||
|
assert(strlen(ctx->file.buf) + 1 + 64 < sizeof(key));
|
||||||
idxGenLRUKey(key, ctx->file.buf, blkId);
|
idxGenLRUKey(key, ctx->file.buf, blkId);
|
||||||
LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key));
|
LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key));
|
||||||
|
|
||||||
|
@ -99,6 +100,7 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
|
||||||
assert(blk->nread <= kBlockSize);
|
assert(blk->nread <= kBlockSize);
|
||||||
|
|
||||||
if (blk->nread < kBlockSize && blk->nread < len) {
|
if (blk->nread < kBlockSize && blk->nread < len) {
|
||||||
|
taosMemoryFree(blk);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,7 +152,7 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
|
||||||
if (ctx->type == TFILE) {
|
if (ctx->type == TFILE) {
|
||||||
// ugly code, refactor later
|
// ugly code, refactor later
|
||||||
ctx->file.readOnly = readOnly;
|
ctx->file.readOnly = readOnly;
|
||||||
memcpy(ctx->file.buf, path, strlen(path));
|
memcpy(ctx->file.buf, path, sizeof(ctx->file.buf));
|
||||||
if (readOnly == false) {
|
if (readOnly == false) {
|
||||||
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
taosFtruncateFile(ctx->file.pFile, 0);
|
taosFtruncateFile(ctx->file.pFile, 0);
|
||||||
|
|
|
@ -506,7 +506,9 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
|
||||||
tfh.suid = suid;
|
tfh.suid = suid;
|
||||||
tfh.version = version;
|
tfh.version = version;
|
||||||
tfh.colType = colType;
|
tfh.colType = colType;
|
||||||
memcpy(tfh.colName, colName, strlen(colName));
|
if (strlen(colName) <= sizeof(tfh.colName)) {
|
||||||
|
memcpy(tfh.colName, colName, strlen(colName));
|
||||||
|
}
|
||||||
|
|
||||||
return tfileWriterCreate(wcx, &tfh);
|
return tfileWriterCreate(wcx, &tfh);
|
||||||
}
|
}
|
||||||
|
@ -580,8 +582,13 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
|
||||||
|
|
||||||
if (cap < ttsz) {
|
if (cap < ttsz) {
|
||||||
cap = ttsz;
|
cap = ttsz;
|
||||||
buf = (char*)taosMemoryRealloc(buf, cap);
|
char* t = (char*)taosMemoryRealloc(buf, cap);
|
||||||
|
if (t == NULL) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
char* p = buf;
|
char* p = buf;
|
||||||
tfileSerialTableIdsToBuf(p, v->tableId);
|
tfileSerialTableIdsToBuf(p, v->tableId);
|
||||||
tw->ctx->write(tw->ctx, buf, ttsz);
|
tw->ctx->write(tw->ctx, buf, ttsz);
|
||||||
|
|
|
@ -146,7 +146,7 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
uError("http-report recv error:%s", uv_err_name(nread));
|
uError("http-report recv error:%s", uv_err_name(nread));
|
||||||
} else {
|
} else {
|
||||||
uTrace("http-report succ to recv %d bytes", nread);
|
uTrace("http-report succ to recv %d bytes", (int32_t)nread);
|
||||||
}
|
}
|
||||||
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,11 +54,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
pRpc->retry = pInit->rfp;
|
pRpc->retry = pInit->rfp;
|
||||||
pRpc->startTimer = pInit->tfp;
|
pRpc->startTimer = pInit->tfp;
|
||||||
|
|
||||||
if (pInit->connType == TAOS_CONN_SERVER) {
|
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
||||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
|
||||||
} else {
|
|
||||||
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t ip = 0;
|
uint32_t ip = 0;
|
||||||
if (pInit->connType == TAOS_CONN_SERVER) {
|
if (pInit->connType == TAOS_CONN_SERVER) {
|
||||||
|
@ -79,7 +75,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
}
|
}
|
||||||
pRpc->parent = pInit->parent;
|
pRpc->parent = pInit->parent;
|
||||||
if (pInit->user) {
|
if (pInit->user) {
|
||||||
memcpy(pRpc->user, pInit->user, strlen(pInit->user));
|
memcpy(pRpc->user, pInit->user, TSDB_UNI_LEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
|
int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
|
||||||
|
|
|
@ -267,11 +267,12 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
||||||
#define EPSET_GET_SIZE(epSet) (epSet)->numOfEps
|
#define EPSET_GET_SIZE(epSet) (epSet)->numOfEps
|
||||||
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
|
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
|
||||||
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
|
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
|
||||||
#define EPSET_FORWARD_INUSE(epSet) \
|
#define EPSET_FORWARD_INUSE(epSet) \
|
||||||
do { \
|
do { \
|
||||||
if ((epSet)->numOfEps != 0) { \
|
if ((epSet)->numOfEps != 0) { \
|
||||||
(epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \
|
++((epSet)->inUse); \
|
||||||
} \
|
(epSet)->inUse = ((epSet)->inUse) % ((epSet)->numOfEps); \
|
||||||
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define EPSET_DEBUG_STR(epSet, tbuf) \
|
#define EPSET_DEBUG_STR(epSet, tbuf) \
|
||||||
|
@ -503,6 +504,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
||||||
SConnList list = {0};
|
SConnList list = {0};
|
||||||
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
|
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
|
||||||
plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||||
|
if (plist == NULL) return NULL;
|
||||||
QUEUE_INIT(&plist->conns);
|
QUEUE_INIT(&plist->conns);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1157,7 +1159,7 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj));
|
SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj));
|
||||||
|
|
||||||
STrans* pTransInst = shandle;
|
STrans* pTransInst = shandle;
|
||||||
memcpy(cli->label, label, strlen(label));
|
memcpy(cli->label, label, TSDB_LABEL_LEN);
|
||||||
cli->numOfThreads = numOfThreads;
|
cli->numOfThreads = numOfThreads;
|
||||||
cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*));
|
cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*));
|
||||||
|
|
||||||
|
@ -1611,8 +1613,8 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
|
||||||
|
|
||||||
SCvtAddr cvtAddr = {0};
|
SCvtAddr cvtAddr = {0};
|
||||||
if (ip != NULL && fqdn != NULL) {
|
if (ip != NULL && fqdn != NULL) {
|
||||||
memcpy(cvtAddr.ip, ip, strlen(ip));
|
if (strlen(ip) <= sizeof(cvtAddr.ip)) memcpy(cvtAddr.ip, ip, strlen(ip));
|
||||||
memcpy(cvtAddr.fqdn, fqdn, strlen(fqdn));
|
if (strlen(fqdn) <= sizeof(cvtAddr.fqdn)) memcpy(cvtAddr.fqdn, fqdn, strlen(fqdn));
|
||||||
cvtAddr.cvt = true;
|
cvtAddr.cvt = true;
|
||||||
}
|
}
|
||||||
for (int i = 0; i < pTransInst->numOfThreads; i++) {
|
for (int i = 0; i < pTransInst->numOfThreads; i++) {
|
||||||
|
|
|
@ -590,7 +590,9 @@ TdSocketPtr taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t
|
||||||
taosCloseSocket(&pSocket);
|
taosCloseSocket(&pSocket);
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
taosKeepTcpAlive(pSocket);
|
if (taosKeepTcpAlive(pSocket) == -1) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return pSocket;
|
return pSocket;
|
||||||
|
@ -1059,18 +1061,22 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) {
|
||||||
}
|
}
|
||||||
#if defined(WINDOWS)
|
#if defined(WINDOWS)
|
||||||
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&timeout, sizeof(timeout))) {
|
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&timeout, sizeof(timeout))) {
|
||||||
|
taosCloseSocketNoCheck1(fd);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
#elif defined(_TD_DARWIN_64)
|
#elif defined(_TD_DARWIN_64)
|
||||||
uint32_t conn_timeout_ms = timeout * 1000;
|
uint32_t conn_timeout_ms = timeout * 1000;
|
||||||
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
|
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
|
||||||
|
taosCloseSocketNoCheck1(fd);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
#else // Linux like systems
|
#else // Linux like systems
|
||||||
uint32_t conn_timeout_ms = timeout * 1000;
|
uint32_t conn_timeout_ms = timeout * 1000;
|
||||||
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
|
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
|
||||||
|
taosCloseSocketNoCheck1(fd);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return (int)fd;
|
return (int)fd;
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
|
||||||
if (pSched->queue == NULL) {
|
if (pSched->queue == NULL) {
|
||||||
uError("%s: no enough memory for queue", label);
|
uError("%s: no enough memory for queue", label);
|
||||||
taosCleanUpScheduler(pSched);
|
taosCleanUpScheduler(pSched);
|
||||||
|
taosMemoryFree(pSched);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue