Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/ly_res
This commit is contained in:
commit
436c2dbd29
|
@ -44,8 +44,50 @@ TDengine 可以高效地从 Kafka 读取数据并将其写入 TDengine,以实
|
||||||
|
|
||||||
如果服务端开启了 SASL 认证机制,此处需要启用 SASL 并配置相关内容,目前支持 PLAIN/SCRAM-SHA-256/GSSAPI 三种认证机制,请按实际情况进行选择。
|
如果服务端开启了 SASL 认证机制,此处需要启用 SASL 并配置相关内容,目前支持 PLAIN/SCRAM-SHA-256/GSSAPI 三种认证机制,请按实际情况进行选择。
|
||||||
|
|
||||||
|
#### 4.1. PLAIN 认证
|
||||||
|
|
||||||
|
选择 `PLAIN` 认证机制,输入用户名和密码:
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
#### 4.1. SCRAM(SCRAM-SHA-256) 认证
|
||||||
|
|
||||||
|
选择 `SCRAM-SHA-256` 认证机制,输入用户名和密码:
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
|
#### 4.3. GSSAPI 认证
|
||||||
|
|
||||||
|
选择 `GSSAPI` ,将通过 [RDkafka 客户端](https://github.com/confluentinc/librdkafka) 调用 GSSAPI 应用 Kerberos 认证机制:
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
需要输入的信息有:
|
||||||
|
|
||||||
|
- Kerberos 服务名,一般是 `kafka`;
|
||||||
|
- Kerberos 认证主体,即认证用户名,例如 `kafkaclient`;
|
||||||
|
- Kerberos 初始化命令(可选,一般不用填写);
|
||||||
|
- Kerberos 密钥表,需提供文件并上传;
|
||||||
|
|
||||||
|
以上信息均需由 Kafka 服务管理者提供。
|
||||||
|
|
||||||
|
除此之外,在服务器上需要配置 [Kerberos](https://web.mit.edu/kerberos/) 认证服务。在 Ubuntu 下使用 `apt install krb5-user` ;在 CentOS 下,使用 `yum install krb5-workstation`;即可。
|
||||||
|
|
||||||
|
配置完成后,可以使用 [kcat](https://github.com/edenhill/kcat) 工具进行 Kafka 主题消费验证:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
kcat <topic> \
|
||||||
|
-b <kafka-server:port> \
|
||||||
|
-G kcat \
|
||||||
|
-X security.protocol=SASL_PLAINTEXT \
|
||||||
|
-X sasl.mechanism=GSSAPI \
|
||||||
|
-X sasl.kerberos.keytab=</path/to/kafkaclient.keytab> \
|
||||||
|
-X sasl.kerberos.principal=<kafkaclient> \
|
||||||
|
-X sasl.kerberos.service.name=kafka
|
||||||
|
```
|
||||||
|
|
||||||
|
如果出现错误:“Server xxxx not found in kerberos database”,则需要配置 Kafka 节点对应的域名并在 Kerberos 客户端配置文件 `/etc/krb5.conf` 中配置反向域名解析 `rdns = true`。
|
||||||
|
|
||||||
### 5. 配置 SSL 证书
|
### 5. 配置 SSL 证书
|
||||||
|
|
||||||
如果服务端开启了 SSL 加密认证,此处需要启用 SSL 并配置相关内容。
|
如果服务端开启了 SSL 加密认证,此处需要启用 SSL 并配置相关内容。
|
||||||
|
@ -160,4 +202,4 @@ json 数据支持 JSONObject 或者 JSONArray,使用 json 解析器可以解
|
||||||
|
|
||||||
### 9. 创建完成
|
### 9. 创建完成
|
||||||
|
|
||||||
点击 **提交** 按钮,完成创建 Kafka 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
|
点击 **提交** 按钮,完成创建 Kafka 到 TDengine 的数据同步任务,回到**数据源列表**页面可查看任务执行情况。
|
||||||
|
|
Binary file not shown.
After Width: | Height: | Size: 43 KiB |
Binary file not shown.
After Width: | Height: | Size: 20 KiB |
|
@ -2281,6 +2281,10 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
int32_t bytes = pShow->pMeta->pSchemas[cols].bytes;
|
int32_t bytes = pShow->pMeta->pSchemas[cols].bytes;
|
||||||
char *buf = taosMemoryMalloc(bytes);
|
char *buf = taosMemoryMalloc(bytes);
|
||||||
|
if (buf == NULL) {
|
||||||
|
mError("db:%s, failed to malloc buffer", pDb->name);
|
||||||
|
return;
|
||||||
|
}
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
|
|
@ -184,6 +184,7 @@ static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) {
|
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) {
|
||||||
|
int32_t code = 0;
|
||||||
mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
|
mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
|
||||||
|
|
||||||
taosWLockLatch(&pOld->lock);
|
taosWLockLatch(&pOld->lock);
|
||||||
|
@ -205,6 +206,11 @@ static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) {
|
||||||
if (pNew->commentSize > 0 && pNew->pComment != NULL) {
|
if (pNew->commentSize > 0 && pNew->pComment != NULL) {
|
||||||
pOld->commentSize = pNew->commentSize;
|
pOld->commentSize = pNew->commentSize;
|
||||||
pOld->pComment = taosMemoryMalloc(pOld->commentSize);
|
pOld->pComment = taosMemoryMalloc(pOld->commentSize);
|
||||||
|
if (pOld->pComment == NULL) {
|
||||||
|
code = terrno;
|
||||||
|
taosWUnLockLatch(&pOld->lock);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
(void)memcpy(pOld->pComment, pNew->pComment, pOld->commentSize);
|
(void)memcpy(pOld->pComment, pNew->pComment, pOld->commentSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,6 +221,11 @@ static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) {
|
||||||
if (pNew->codeSize > 0 && pNew->pCode != NULL) {
|
if (pNew->codeSize > 0 && pNew->pCode != NULL) {
|
||||||
pOld->codeSize = pNew->codeSize;
|
pOld->codeSize = pNew->codeSize;
|
||||||
pOld->pCode = taosMemoryMalloc(pOld->codeSize);
|
pOld->pCode = taosMemoryMalloc(pOld->codeSize);
|
||||||
|
if (pOld->pCode == NULL) {
|
||||||
|
code = terrno;
|
||||||
|
taosWUnLockLatch(&pOld->lock);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
(void)memcpy(pOld->pCode, pNew->pCode, pOld->codeSize);
|
(void)memcpy(pOld->pCode, pNew->pCode, pOld->codeSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -261,6 +272,10 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre
|
||||||
if (NULL != pCreate->pComment) {
|
if (NULL != pCreate->pComment) {
|
||||||
func.commentSize = strlen(pCreate->pComment) + 1;
|
func.commentSize = strlen(pCreate->pComment) + 1;
|
||||||
func.pComment = taosMemoryMalloc(func.commentSize);
|
func.pComment = taosMemoryMalloc(func.commentSize);
|
||||||
|
if (func.pComment == NULL) {
|
||||||
|
code = terrno;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
func.codeSize = pCreate->codeLen;
|
func.codeSize = pCreate->codeLen;
|
||||||
func.pCode = taosMemoryMalloc(func.codeSize);
|
func.pCode = taosMemoryMalloc(func.codeSize);
|
||||||
|
@ -716,6 +731,11 @@ static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
||||||
? TSDB_MAX_BINARY_LEN
|
? TSDB_MAX_BINARY_LEN
|
||||||
: pFunc->codeSize + VARSTR_HEADER_SIZE;
|
: pFunc->codeSize + VARSTR_HEADER_SIZE;
|
||||||
char *b4 = taosMemoryMalloc(varCodeLen);
|
char *b4 = taosMemoryMalloc(varCodeLen);
|
||||||
|
if (b4 == NULL) {
|
||||||
|
code = terrno;
|
||||||
|
sdbRelease(pSdb, pFunc);
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
(void)memcpy(varDataVal(b4), pFunc->pCode, varCodeLen - VARSTR_HEADER_SIZE);
|
(void)memcpy(varDataVal(b4), pFunc->pCode, varCodeLen - VARSTR_HEADER_SIZE);
|
||||||
varDataSetLen(b4, varCodeLen - VARSTR_HEADER_SIZE);
|
varDataSetLen(b4, varCodeLen - VARSTR_HEADER_SIZE);
|
||||||
code = colDataSetVal(pColInfo, numOfRows, (const char *)b4, false);
|
code = colDataSetVal(pColInfo, numOfRows, (const char *)b4, false);
|
||||||
|
|
|
@ -343,6 +343,10 @@ static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *p
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pCreateReq);
|
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pCreateReq);
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
|
if (pReq == NULL) {
|
||||||
|
code = terrno;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
code = tSerializeSDCreateMnodeReq(pReq, contLen, pCreateReq);
|
code = tSerializeSDCreateMnodeReq(pReq, contLen, pCreateReq);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
|
@ -369,6 +373,10 @@ static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, SDAlterMnodeType
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
|
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
|
if (pReq == NULL) {
|
||||||
|
code = terrno;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
code = tSerializeSDCreateMnodeReq(pReq, contLen, pAlterMnodeTypeReq);
|
code = tSerializeSDCreateMnodeReq(pReq, contLen, pAlterMnodeTypeReq);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
|
@ -395,6 +403,10 @@ static int32_t mndBuildAlterMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pA
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterReq);
|
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterReq);
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
|
if (pReq == NULL) {
|
||||||
|
code = terrno;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
code = tSerializeSDCreateMnodeReq(pReq, contLen, pAlterReq);
|
code = tSerializeSDCreateMnodeReq(pReq, contLen, pAlterReq);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
|
@ -420,6 +432,10 @@ static int32_t mndBuildDropMnodeRedoAction(STrans *pTrans, SDDropMnodeReq *pDrop
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, pDropReq);
|
int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, pDropReq);
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
|
if (pReq == NULL) {
|
||||||
|
code = terrno;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
code = tSerializeSCreateDropMQSNodeReq(pReq, contLen, pDropReq);
|
code = tSerializeSCreateDropMQSNodeReq(pReq, contLen, pDropReq);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
|
|
|
@ -2352,6 +2352,11 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, i
|
||||||
}
|
}
|
||||||
|
|
||||||
void *cont = taosMemoryMalloc(contLen);
|
void *cont = taosMemoryMalloc(contLen);
|
||||||
|
if (NULL == cont) {
|
||||||
|
code = terrno;
|
||||||
|
tFreeSMAlterStbRsp(&alterRsp);
|
||||||
|
TAOS_RETURN(code);
|
||||||
|
}
|
||||||
tEncoderInit(&ec, cont, contLen);
|
tEncoderInit(&ec, cont, contLen);
|
||||||
code = tEncodeSMAlterStbRsp(&ec, &alterRsp);
|
code = tEncodeSMAlterStbRsp(&ec, &alterRsp);
|
||||||
tEncoderClear(&ec);
|
tEncoderClear(&ec);
|
||||||
|
@ -2407,6 +2412,11 @@ int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char *dbFName, char *stbFName, vo
|
||||||
}
|
}
|
||||||
|
|
||||||
void *cont = taosMemoryMalloc(contLen);
|
void *cont = taosMemoryMalloc(contLen);
|
||||||
|
if (NULL == cont) {
|
||||||
|
code = terrno;
|
||||||
|
tFreeSMCreateStbRsp(&stbRsp);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
tEncoderInit(&ec, cont, contLen);
|
tEncoderInit(&ec, cont, contLen);
|
||||||
TAOS_CHECK_GOTO(tEncodeSMCreateStbRsp(&ec, &stbRsp), NULL, _OVER);
|
TAOS_CHECK_GOTO(tEncodeSMCreateStbRsp(&ec, &stbRsp), NULL, _OVER);
|
||||||
tEncoderClear(&ec);
|
tEncoderClear(&ec);
|
||||||
|
|
|
@ -367,6 +367,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pTrans->paramLen, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pTrans->paramLen, _OVER)
|
||||||
if (pTrans->paramLen != 0) {
|
if (pTrans->paramLen != 0) {
|
||||||
pTrans->param = taosMemoryMalloc(pTrans->paramLen);
|
pTrans->param = taosMemoryMalloc(pTrans->paramLen);
|
||||||
|
if (pTrans->param == NULL) goto _OVER;
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, _OVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,7 @@ typedef struct SMetaStbStatsEntry {
|
||||||
} SMetaStbStatsEntry;
|
} SMetaStbStatsEntry;
|
||||||
|
|
||||||
typedef struct STagFilterResEntry {
|
typedef struct STagFilterResEntry {
|
||||||
SList list; // the linked list of md5 digest, extracted from the serialized tag query condition
|
SHashObj *set; // the set of md5 digest, extracted from the serialized tag query condition
|
||||||
uint32_t hitTimes; // queried times for current super table
|
uint32_t hitTimes; // queried times for current super table
|
||||||
} STagFilterResEntry;
|
} STagFilterResEntry;
|
||||||
|
|
||||||
|
@ -112,7 +112,7 @@ static void statsCacheClose(SMeta* pMeta) {
|
||||||
|
|
||||||
static void freeCacheEntryFp(void* param) {
|
static void freeCacheEntryFp(void* param) {
|
||||||
STagFilterResEntry** p = param;
|
STagFilterResEntry** p = param;
|
||||||
tdListEmpty(&(*p)->list);
|
taosHashCleanup((*p)->set);
|
||||||
taosMemoryFreeClear(*p);
|
taosMemoryFreeClear(*p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,10 +200,12 @@ void metaCacheClose(SMeta* pMeta) {
|
||||||
entryCacheClose(pMeta);
|
entryCacheClose(pMeta);
|
||||||
statsCacheClose(pMeta);
|
statsCacheClose(pMeta);
|
||||||
|
|
||||||
|
taosHashClear(pMeta->pCache->sTagFilterResCache.pTableEntry);
|
||||||
taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
|
taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
|
||||||
(void)taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
|
(void)taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
|
||||||
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
|
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
|
||||||
|
|
||||||
|
taosHashClear(pMeta->pCache->STbGroupResCache.pTableEntry);
|
||||||
taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
|
taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
|
||||||
(void)taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
|
(void)taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
|
||||||
taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);
|
taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);
|
||||||
|
@ -471,34 +473,6 @@ int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int checkAllEntriesInCache(const STagFilterResEntry* pEntry, SArray* pInvalidRes, int32_t keyLen,
|
|
||||||
SLRUCache* pCache, uint64_t suid) {
|
|
||||||
SListIter iter = {0};
|
|
||||||
tdListInitIter((SList*)&(pEntry->list), &iter, TD_LIST_FORWARD);
|
|
||||||
|
|
||||||
SListNode* pNode = NULL;
|
|
||||||
uint64_t buf[3];
|
|
||||||
buf[0] = suid;
|
|
||||||
|
|
||||||
int32_t len = sizeof(uint64_t) * tListLen(buf);
|
|
||||||
|
|
||||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
|
||||||
memcpy(&buf[1], pNode->data, keyLen);
|
|
||||||
|
|
||||||
// check whether it is existed in LRU cache, and remove it from linked list if not.
|
|
||||||
LRUHandle* pRes = taosLRUCacheLookup(pCache, buf, len);
|
|
||||||
if (pRes == NULL) { // remove the item in the linked list
|
|
||||||
if (taosArrayPush(pInvalidRes, &pNode) == NULL) {
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
bool ret = taosLRUCacheRelease(pCache, pRes, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
|
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
|
||||||
memcpy(&pBuf[2], key, keyLen);
|
memcpy(&pBuf[2], key, keyLen);
|
||||||
}
|
}
|
||||||
|
@ -584,22 +558,11 @@ static void freeUidCachePayload(const void* key, size_t keyLen, void* value, voi
|
||||||
|
|
||||||
if (pEntry != NULL && (*pEntry) != NULL) {
|
if (pEntry != NULL && (*pEntry) != NULL) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
|
||||||
SListIter iter = {0};
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", taosHashGetSize((*pEntry)->set),
|
||||||
SListNode* pNode = NULL;
|
el);
|
||||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
|
||||||
uint64_t* digest = (uint64_t*)pNode->data;
|
|
||||||
if (digest[0] == p[2] && digest[1] == p[3]) {
|
|
||||||
void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
|
|
||||||
taosMemoryFree(tmp);
|
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
|
||||||
metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
|
|
||||||
el);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -607,16 +570,30 @@ static void freeUidCachePayload(const void* key, size_t keyLen, void* value, voi
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
|
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
|
STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
|
||||||
if (p == NULL) {
|
TSDB_CHECK_NULL(p, code, lino, _end, terrno);
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
p->hitTimes = 0;
|
p->hitTimes = 0;
|
||||||
tdListInit(&p->list, keyLen);
|
p->set = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
TAOS_CHECK_RETURN(taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES));
|
TSDB_CHECK_NULL(p->set, code, lino, _end, terrno);
|
||||||
TAOS_CHECK_RETURN(tdListAppend(&p->list, pKey));
|
code = taosHashPut(p->set, pKey, keyLen, NULL, 0);
|
||||||
return 0;
|
TSDB_CHECK_CODE(code, lino, _end);
|
||||||
|
code = taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
metaError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
if (p != NULL) {
|
||||||
|
if (p->set != NULL) {
|
||||||
|
taosHashCleanup(p->set);
|
||||||
|
}
|
||||||
|
taosMemoryFree(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check both the payload size and selectivity ratio
|
// check both the payload size and selectivity ratio
|
||||||
|
@ -657,25 +634,14 @@ int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
} else { // check if it exists or not
|
} else { // check if it exists or not
|
||||||
size_t size = listNEles(&(*pEntry)->list);
|
code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
|
||||||
if (size == 0) {
|
if (code == TSDB_CODE_DUP_KEY) {
|
||||||
code = tdListAppend(&(*pEntry)->list, pKey);
|
// we have already found the existed items, no need to added to cache anymore.
|
||||||
if (code) {
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
goto _end;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
} else {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SListNode* pNode = listHead(&(*pEntry)->list);
|
goto _end;
|
||||||
uint64_t* p = (uint64_t*)pNode->data;
|
|
||||||
if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
|
|
||||||
// we have already found the existed items, no need to added to cache anymore.
|
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
} else { // not equal, append it
|
|
||||||
code = tdListAppend(&(*pEntry)->list, pKey);
|
|
||||||
if (code) {
|
|
||||||
goto _end;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -703,23 +669,20 @@ int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
|
||||||
(void)taosThreadMutexLock(pLock);
|
(void)taosThreadMutexLock(pLock);
|
||||||
|
|
||||||
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
|
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
|
||||||
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
|
if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pEntry)->hitTimes = 0;
|
(*pEntry)->hitTimes = 0;
|
||||||
|
|
||||||
SListIter iter = {0};
|
char *iter = taosHashIterate((*pEntry)->set, NULL);
|
||||||
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
|
while (iter != NULL) {
|
||||||
|
setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
|
||||||
SListNode* pNode = NULL;
|
|
||||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
|
||||||
setMD5DigestInKey(p, pNode->data, 2 * sizeof(uint64_t));
|
|
||||||
taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
|
taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
|
||||||
|
iter = taosHashIterate((*pEntry)->set, iter);
|
||||||
}
|
}
|
||||||
|
taosHashClear((*pEntry)->set);
|
||||||
tdListEmpty(&(*pEntry)->list);
|
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
|
|
||||||
metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
|
metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
|
||||||
|
@ -789,22 +752,11 @@ static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value,
|
||||||
|
|
||||||
if (pEntry != NULL && (*pEntry) != NULL) {
|
if (pEntry != NULL && (*pEntry) != NULL) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
|
||||||
SListIter iter = {0};
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
tdListInitIter((SList*)&((*pEntry)->list), &iter, TD_LIST_FORWARD);
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
|
||||||
SListNode* pNode = NULL;
|
taosHashGetSize((*pEntry)->set), el);
|
||||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
|
||||||
uint64_t* digest = (uint64_t*)pNode->data;
|
|
||||||
if (digest[0] == p[2] && digest[1] == p[3]) {
|
|
||||||
void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
|
|
||||||
taosMemoryFree(tmp);
|
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
|
||||||
metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
|
|
||||||
listNEles(&((*pEntry)->list)), el);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -840,25 +792,14 @@ int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
} else { // check if it exists or not
|
} else { // check if it exists or not
|
||||||
size_t size = listNEles(&(*pEntry)->list);
|
code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
|
||||||
if (size == 0) {
|
if (code == TSDB_CODE_DUP_KEY) {
|
||||||
code = tdListAppend(&(*pEntry)->list, pKey);
|
// we have already found the existed items, no need to added to cache anymore.
|
||||||
if (code) {
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
goto _end;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
} else {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SListNode* pNode = listHead(&(*pEntry)->list);
|
goto _end;
|
||||||
uint64_t* p = (uint64_t*)pNode->data;
|
|
||||||
if (p[1] == ((uint64_t*)pKey)[1] && p[0] == ((uint64_t*)pKey)[0]) {
|
|
||||||
// we have already found the existed items, no need to added to cache anymore.
|
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
} else { // not equal, append it
|
|
||||||
code = tdListAppend(&(*pEntry)->list, pKey);
|
|
||||||
if (code) {
|
|
||||||
goto _end;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -886,23 +827,20 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
|
||||||
(void)taosThreadMutexLock(pLock);
|
(void)taosThreadMutexLock(pLock);
|
||||||
|
|
||||||
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
|
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
|
||||||
if (pEntry == NULL || listNEles(&(*pEntry)->list) == 0) {
|
if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pEntry)->hitTimes = 0;
|
(*pEntry)->hitTimes = 0;
|
||||||
|
|
||||||
SListIter iter = {0};
|
char *iter = taosHashIterate((*pEntry)->set, NULL);
|
||||||
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
|
while (iter != NULL) {
|
||||||
|
setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
|
||||||
SListNode* pNode = NULL;
|
|
||||||
while ((pNode = tdListNext(&iter)) != NULL) {
|
|
||||||
setMD5DigestInKey(p, pNode->data, 2 * sizeof(uint64_t));
|
|
||||||
taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
|
taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
|
||||||
|
iter = taosHashIterate((*pEntry)->set, iter);
|
||||||
}
|
}
|
||||||
|
taosHashClear((*pEntry)->set);
|
||||||
tdListEmpty(&(*pEntry)->list);
|
|
||||||
(void)taosThreadMutexUnlock(pLock);
|
(void)taosThreadMutexUnlock(pLock);
|
||||||
|
|
||||||
metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
|
metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
|
||||||
|
|
|
@ -40,16 +40,26 @@ int taos_metric_formatter_load_sample_new(taos_metric_formatter_t *self, taos_me
|
||||||
int32_t len = end -start;
|
int32_t len = end -start;
|
||||||
|
|
||||||
char* keyvalues = taosMemoryMalloc(len);
|
char* keyvalues = taosMemoryMalloc(len);
|
||||||
|
if (keyvalues == NULL) return 1;
|
||||||
memset(keyvalues, 0, len);
|
memset(keyvalues, 0, len);
|
||||||
memcpy(keyvalues, start + 1, len - 1);
|
memcpy(keyvalues, start + 1, len - 1);
|
||||||
|
|
||||||
int32_t count = taos_monitor_count_occurrences(keyvalues, ",");
|
int32_t count = taos_monitor_count_occurrences(keyvalues, ",");
|
||||||
|
|
||||||
char** keyvalue = taosMemoryMalloc(sizeof(char*) * (count + 1));
|
char** keyvalue = taosMemoryMalloc(sizeof(char*) * (count + 1));
|
||||||
|
if (keyvalue == NULL) {
|
||||||
|
taosMemoryFreeClear(keyvalues);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
memset(keyvalue, 0, sizeof(char*) * (count + 1));
|
memset(keyvalue, 0, sizeof(char*) * (count + 1));
|
||||||
taos_monitor_split_str(keyvalue, keyvalues, ",");
|
taos_monitor_split_str(keyvalue, keyvalues, ",");
|
||||||
|
|
||||||
char** arr = taosMemoryMalloc(sizeof(char*) * (count + 1) * 2);
|
char** arr = taosMemoryMalloc(sizeof(char*) * (count + 1) * 2);
|
||||||
|
if (arr == NULL) {
|
||||||
|
taosMemoryFreeClear(keyvalue);
|
||||||
|
taosMemoryFreeClear(keyvalues);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
memset(arr, 0, sizeof(char*) * (count + 1) * 2);
|
memset(arr, 0, sizeof(char*) * (count + 1) * 2);
|
||||||
|
|
||||||
bool isfound = true;
|
bool isfound = true;
|
||||||
|
@ -165,6 +175,7 @@ int taos_metric_formatter_load_metric_new(taos_metric_formatter_t *self, taos_me
|
||||||
|
|
||||||
int32_t size = strlen(metric->name);
|
int32_t size = strlen(metric->name);
|
||||||
char* name = taosMemoryMalloc(size + 1);
|
char* name = taosMemoryMalloc(size + 1);
|
||||||
|
if (name == NULL) return 1;
|
||||||
memset(name, 0, size + 1);
|
memset(name, 0, size + 1);
|
||||||
memcpy(name, metric->name, size);
|
memcpy(name, metric->name, size);
|
||||||
char* arr[2] = {0}; //arr[0] is table name, arr[1] is metric name
|
char* arr[2] = {0}; //arr[0] is table name, arr[1] is metric name
|
||||||
|
|
|
@ -37,6 +37,7 @@ void taos_monitor_split_str(char** arr, char* str, const char* del) {
|
||||||
void taos_monitor_split_str_metric(char** arr, taos_metric_t* metric, const char* del, char** buf) {
|
void taos_monitor_split_str_metric(char** arr, taos_metric_t* metric, const char* del, char** buf) {
|
||||||
int32_t size = strlen(metric->name);
|
int32_t size = strlen(metric->name);
|
||||||
char* name = taosMemoryMalloc(size + 1);
|
char* name = taosMemoryMalloc(size + 1);
|
||||||
|
if (name == NULL) return;
|
||||||
memset(name, 0, size + 1);
|
memset(name, 0, size + 1);
|
||||||
memcpy(name, metric->name, size);
|
memcpy(name, metric->name, size);
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ typedef struct TdSocket {
|
||||||
#endif
|
#endif
|
||||||
int refId;
|
int refId;
|
||||||
SocketFd fd;
|
SocketFd fd;
|
||||||
} *TdSocketPtr, TdSocket;
|
} * TdSocketPtr, TdSocket;
|
||||||
|
|
||||||
typedef struct TdSocketServer {
|
typedef struct TdSocketServer {
|
||||||
#if SOCKET_WITH_LOCK
|
#if SOCKET_WITH_LOCK
|
||||||
|
@ -63,7 +63,7 @@ typedef struct TdSocketServer {
|
||||||
#endif
|
#endif
|
||||||
int refId;
|
int refId;
|
||||||
SocketFd fd;
|
SocketFd fd;
|
||||||
} *TdSocketServerPtr, TdSocketServer;
|
} * TdSocketServerPtr, TdSocketServer;
|
||||||
|
|
||||||
typedef struct TdEpoll {
|
typedef struct TdEpoll {
|
||||||
#if SOCKET_WITH_LOCK
|
#if SOCKET_WITH_LOCK
|
||||||
|
@ -71,52 +71,7 @@ typedef struct TdEpoll {
|
||||||
#endif
|
#endif
|
||||||
int refId;
|
int refId;
|
||||||
EpollFd fd;
|
EpollFd fd;
|
||||||
} *TdEpollPtr, TdEpoll;
|
} * TdEpollPtr, TdEpoll;
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t taosSendto(TdSocketPtr pSocket, void *buf, int len, unsigned int flags, const struct sockaddr *dest_addr,
|
|
||||||
int addrlen) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#ifdef WINDOWS
|
|
||||||
return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen);
|
|
||||||
#else
|
|
||||||
return sendto(pSocket->fd, buf, len, flags, dest_addr, addrlen);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t taosWriteSocket(TdSocketPtr pSocket, void *buf, int len) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#ifdef WINDOWS
|
|
||||||
return send(pSocket->fd, buf, len, 0);
|
|
||||||
;
|
|
||||||
#else
|
|
||||||
return write(pSocket->fd, buf, len);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
int32_t taosReadSocket(TdSocketPtr pSocket, void *buf, int len) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#ifdef WINDOWS
|
|
||||||
return recv(pSocket->fd, buf, len, 0);
|
|
||||||
;
|
|
||||||
#else
|
|
||||||
return read(pSocket->fd, buf, len);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t taosReadFromSocket(TdSocketPtr pSocket, void *buf, int32_t len, int32_t flags, struct sockaddr *destAddr,
|
|
||||||
int *addrLen) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return recvfrom(pSocket->fd, buf, len, flags, destAddr, addrLen);
|
|
||||||
}
|
|
||||||
#endif // endif 0
|
|
||||||
|
|
||||||
int32_t taosCloseSocketNoCheck1(SocketFd fd) {
|
int32_t taosCloseSocketNoCheck1(SocketFd fd) {
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
|
@ -145,136 +100,16 @@ int32_t taosCloseSocket(TdSocketPtr *ppSocket) {
|
||||||
code = taosCloseSocketNoCheck1((*ppSocket)->fd);
|
code = taosCloseSocketNoCheck1((*ppSocket)->fd);
|
||||||
(*ppSocket)->fd = -1;
|
(*ppSocket)->fd = -1;
|
||||||
taosMemoryFree(*ppSocket);
|
taosMemoryFree(*ppSocket);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t taosCloseSocketServer(TdSocketServerPtr *ppSocketServer) {
|
|
||||||
int32_t code;
|
|
||||||
if (ppSocketServer == NULL || *ppSocketServer == NULL || (*ppSocketServer)->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
code = taosCloseSocketNoCheck1((*ppSocketServer)->fd);
|
|
||||||
(*ppSocketServer)->fd = -1;
|
|
||||||
taosMemoryFree(*ppSocketServer);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t taosShutDownSocketRD(TdSocketPtr pSocket) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#ifdef WINDOWS
|
|
||||||
return closesocket(pSocket->fd);
|
|
||||||
#elif __APPLE__
|
|
||||||
return close(pSocket->fd);
|
|
||||||
#else
|
|
||||||
return shutdown(pSocket->fd, SHUT_RD);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
int32_t taosShutDownSocketServerRD(TdSocketServerPtr pSocketServer) {
|
|
||||||
if (pSocketServer == NULL || pSocketServer->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#ifdef WINDOWS
|
|
||||||
return closesocket(pSocketServer->fd);
|
|
||||||
#elif __APPLE__
|
|
||||||
return close(pSocketServer->fd);
|
|
||||||
#else
|
|
||||||
return shutdown(pSocketServer->fd, SHUT_RD);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t taosShutDownSocketWR(TdSocketPtr pSocket) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#ifdef WINDOWS
|
|
||||||
return closesocket(pSocket->fd);
|
|
||||||
#elif __APPLE__
|
|
||||||
return close(pSocket->fd);
|
|
||||||
#else
|
|
||||||
return shutdown(pSocket->fd, SHUT_WR);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
int32_t taosShutDownSocketServerWR(TdSocketServerPtr pSocketServer) {
|
|
||||||
if (pSocketServer == NULL || pSocketServer->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#ifdef WINDOWS
|
|
||||||
return closesocket(pSocketServer->fd);
|
|
||||||
#elif __APPLE__
|
|
||||||
return close(pSocketServer->fd);
|
|
||||||
#else
|
|
||||||
return shutdown(pSocketServer->fd, SHUT_WR);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
int32_t taosShutDownSocketRDWR(TdSocketPtr pSocket) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#ifdef WINDOWS
|
|
||||||
return closesocket(pSocket->fd);
|
|
||||||
#elif __APPLE__
|
|
||||||
return close(pSocket->fd);
|
|
||||||
#else
|
|
||||||
return shutdown(pSocket->fd, SHUT_RDWR);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
int32_t taosShutDownSocketServerRDWR(TdSocketServerPtr pSocketServer) {
|
|
||||||
if (pSocketServer == NULL || pSocketServer->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#ifdef WINDOWS
|
|
||||||
return closesocket(pSocketServer->fd);
|
|
||||||
#elif __APPLE__
|
|
||||||
return close(pSocketServer->fd);
|
|
||||||
#else
|
|
||||||
return shutdown(pSocketServer->fd, SHUT_RDWR);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t taosSetNonblocking(TdSocketPtr pSocket, int32_t on) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#ifdef WINDOWS
|
|
||||||
u_long mode;
|
|
||||||
if (on) {
|
|
||||||
mode = 1;
|
|
||||||
ioctlsocket(pSocket->fd, FIONBIO, &mode);
|
|
||||||
} else {
|
|
||||||
mode = 0;
|
|
||||||
ioctlsocket(pSocket->fd, FIONBIO, &mode);
|
|
||||||
}
|
|
||||||
#else
|
|
||||||
int32_t flags = 0;
|
|
||||||
if ((flags = fcntl(pSocket->fd, F_GETFL, 0)) < 0) {
|
|
||||||
// printf("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (on)
|
|
||||||
flags |= O_NONBLOCK;
|
|
||||||
else
|
|
||||||
flags &= ~O_NONBLOCK;
|
|
||||||
|
|
||||||
if ((flags = fcntl(pSocket->fd, F_SETFL, flags)) < 0) {
|
|
||||||
// printf("fcntl(F_SETFL) error: %d (%s)\n", errno, strerror(errno));
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif // endif 0
|
|
||||||
|
|
||||||
int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen) {
|
int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t optlen) {
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
if (pSocket == NULL || pSocket->fd < 0) {
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
#ifdef TCP_KEEPCNT
|
#ifdef TCP_KEEPCNT
|
||||||
if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
|
if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
|
||||||
|
@ -300,11 +135,11 @@ int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int ret = setsockopt(pSocket->fd, level, optname, optval, optlen);
|
int ret = setsockopt(pSocket->fd, level, optname, optval, optlen);
|
||||||
if (ret == SOCKET_ERROR) {
|
if (ret == SOCKET_ERROR) {
|
||||||
int errorCode = WSAGetLastError();
|
int errorCode = WSAGetLastError();
|
||||||
return terrno = TAOS_SYSTEM_WINSOCKET_ERROR(errorCode);
|
return terrno = TAOS_SYSTEM_WINSOCKET_ERROR(errorCode);
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
int32_t code = setsockopt(pSocket->fd, level, optname, optval, (int)optlen);
|
int32_t code = setsockopt(pSocket->fd, level, optname, optval, (int)optlen);
|
||||||
if (-1 == code) {
|
if (-1 == code) {
|
||||||
|
@ -315,22 +150,8 @@ if (ret == SOCKET_ERROR) {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void *optval, int32_t *optlen) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#ifdef WINDOWS
|
|
||||||
return -1;
|
|
||||||
#else
|
|
||||||
return getsockopt(pSocket->fd, level, optname, optval, (int *)optlen);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
const char *taosInetNtoa(struct in_addr ipInt, char *dstStr, int32_t len) {
|
const char *taosInetNtoa(struct in_addr ipInt, char *dstStr, int32_t len) {
|
||||||
const char* r = inet_ntop(AF_INET, &ipInt, dstStr, len);
|
const char *r = inet_ntop(AF_INET, &ipInt, dstStr, len);
|
||||||
if (NULL == r) {
|
if (NULL == r) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
}
|
}
|
||||||
|
@ -344,403 +165,6 @@ const char *taosInetNtoa(struct in_addr ipInt, char *dstStr, int32_t len) {
|
||||||
|
|
||||||
#define TCP_CONN_TIMEOUT 3000 // conn timeout
|
#define TCP_CONN_TIMEOUT 3000 // conn timeout
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
int32_t nleft, nwritten;
|
|
||||||
char *ptr = (char *)buf;
|
|
||||||
|
|
||||||
nleft = nbytes;
|
|
||||||
|
|
||||||
while (nleft > 0) {
|
|
||||||
nwritten = taosWriteSocket(pSocket, (char *)ptr, (size_t)nleft);
|
|
||||||
if (nwritten <= 0) {
|
|
||||||
if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */)
|
|
||||||
continue;
|
|
||||||
else
|
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
nleft -= nwritten;
|
|
||||||
ptr += nwritten;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (errno == SIGPIPE || errno == EPIPE) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return (nbytes - nleft);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
int32_t nleft, nread;
|
|
||||||
char *ptr = (char *)buf;
|
|
||||||
|
|
||||||
nleft = nbytes;
|
|
||||||
|
|
||||||
while (nleft > 0) {
|
|
||||||
nread = taosReadSocket(pSocket, ptr, (size_t)nleft);
|
|
||||||
if (nread == 0) {
|
|
||||||
break;
|
|
||||||
} else if (nread < 0) {
|
|
||||||
if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK*/) {
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
nleft -= nread;
|
|
||||||
ptr += nread;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (errno == SIGPIPE || errno == EPIPE) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return (nbytes - nleft);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t taosNonblockwrite(TdSocketPtr pSocket, char *ptr, int32_t nbytes) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taosSetNonblocking(pSocket, 1);
|
|
||||||
|
|
||||||
int32_t nleft, nwritten, nready;
|
|
||||||
fd_set fset;
|
|
||||||
struct timeval tv;
|
|
||||||
|
|
||||||
nleft = nbytes;
|
|
||||||
while (nleft > 0) {
|
|
||||||
tv.tv_sec = 30;
|
|
||||||
tv.tv_usec = 0;
|
|
||||||
FD_ZERO(&fset);
|
|
||||||
FD_SET(pSocket->fd, &fset);
|
|
||||||
if ((nready = select((SocketFd)(pSocket->fd + 1), NULL, &fset, NULL, &tv)) == 0) {
|
|
||||||
errno = ETIMEDOUT;
|
|
||||||
// printf("fd %d timeout, no enough space to write", fd);
|
|
||||||
break;
|
|
||||||
|
|
||||||
} else if (nready < 0) {
|
|
||||||
if (errno == EINTR) continue;
|
|
||||||
|
|
||||||
// printf("select error, %d (%s)", errno, strerror(errno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
nwritten = (int32_t)send(pSocket->fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
|
|
||||||
if (nwritten <= 0) {
|
|
||||||
if (errno == EAGAIN || errno == EINTR) continue;
|
|
||||||
|
|
||||||
// printf("write error, %d (%s)", errno, strerror(errno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
nleft -= nwritten;
|
|
||||||
ptr += nwritten;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosSetNonblocking(pSocket, 0);
|
|
||||||
|
|
||||||
return (nbytes - nleft);
|
|
||||||
}
|
|
||||||
|
|
||||||
TdSocketPtr taosOpenUdpSocket(uint32_t ip, uint16_t port) {
|
|
||||||
struct sockaddr_in localAddr;
|
|
||||||
SocketFd fd;
|
|
||||||
int32_t bufSize = 1024000;
|
|
||||||
|
|
||||||
// printf("open udp socket:0x%x:%hu", ip, port);
|
|
||||||
|
|
||||||
memset((char *)&localAddr, 0, sizeof(localAddr));
|
|
||||||
localAddr.sin_family = AF_INET;
|
|
||||||
localAddr.sin_addr.s_addr = ip;
|
|
||||||
localAddr.sin_port = (uint16_t)htons(port);
|
|
||||||
|
|
||||||
if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
|
|
||||||
// printf("failed to open udp socket: %d (%s)", errno, strerror(errno));
|
|
||||||
taosCloseSocketNoCheck1(fd);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
|
|
||||||
if (pSocket == NULL) {
|
|
||||||
taosCloseSocketNoCheck1(fd);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pSocket->fd = fd;
|
|
||||||
pSocket->refId = 0;
|
|
||||||
|
|
||||||
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
|
|
||||||
// printf("failed to set the send buffer size for UDP socket\n");
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
|
|
||||||
// printf("failed to set the receive buffer size for UDP socket\n");
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* bind socket to local address */
|
|
||||||
if (bind(pSocket->fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
|
|
||||||
// printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pSocket;
|
|
||||||
}
|
|
||||||
|
|
||||||
TdSocketPtr taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
|
|
||||||
SocketFd fd = -1;
|
|
||||||
int32_t ret;
|
|
||||||
struct sockaddr_in serverAddr, clientAddr;
|
|
||||||
int32_t bufSize = 1024 * 1024;
|
|
||||||
|
|
||||||
fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
|
|
||||||
|
|
||||||
if (fd <= 2) {
|
|
||||||
// printf("failed to open the socket: %d (%s)", errno, strerror(errno));
|
|
||||||
if (fd >= 0) taosCloseSocketNoCheck1(fd);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
|
|
||||||
if (pSocket == NULL) {
|
|
||||||
taosCloseSocketNoCheck1(fd);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pSocket->fd = fd;
|
|
||||||
pSocket->refId = 0;
|
|
||||||
|
|
||||||
/* set REUSEADDR option, so the portnumber can be re-used */
|
|
||||||
int32_t reuse = 1;
|
|
||||||
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
|
|
||||||
// printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
|
|
||||||
// printf("failed to set the send buffer size for TCP socket\n");
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
|
|
||||||
// printf("failed to set the receive buffer size for TCP socket\n");
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (clientIp != 0) {
|
|
||||||
memset((char *)&clientAddr, 0, sizeof(clientAddr));
|
|
||||||
clientAddr.sin_family = AF_INET;
|
|
||||||
clientAddr.sin_addr.s_addr = clientIp;
|
|
||||||
clientAddr.sin_port = 0;
|
|
||||||
|
|
||||||
/* bind socket to client address */
|
|
||||||
if (bind(pSocket->fd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
|
|
||||||
// printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
|
|
||||||
// strerror(errno));
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
memset((char *)&serverAddr, 0, sizeof(serverAddr));
|
|
||||||
serverAddr.sin_family = AF_INET;
|
|
||||||
serverAddr.sin_addr.s_addr = destIp;
|
|
||||||
serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort);
|
|
||||||
|
|
||||||
#ifdef _TD_LINUX
|
|
||||||
taosSetNonblocking(pSocket, 1);
|
|
||||||
ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
|
|
||||||
if (ret == -1) {
|
|
||||||
if (errno == EHOSTUNREACH) {
|
|
||||||
// printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return -1;
|
|
||||||
} else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
||||||
struct pollfd wfd[1];
|
|
||||||
|
|
||||||
wfd[0].fd = pSocket->fd;
|
|
||||||
wfd[0].events = POLLOUT;
|
|
||||||
|
|
||||||
int res = poll(wfd, 1, TCP_CONN_TIMEOUT);
|
|
||||||
if (res == -1 || res == 0) {
|
|
||||||
// printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort);
|
|
||||||
taosCloseSocket(&pSocket); //
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
int optVal = -1, optLen = sizeof(int);
|
|
||||||
if ((0 != taosGetSockOpt(pSocket, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) {
|
|
||||||
// printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort);
|
|
||||||
taosCloseSocket(&pSocket); //
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
ret = 0;
|
|
||||||
} else { // Other error
|
|
||||||
// printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort);
|
|
||||||
taosCloseSocket(&pSocket); //
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
taosSetNonblocking(pSocket, 0);
|
|
||||||
|
|
||||||
#else
|
|
||||||
ret = connect(pSocket->fd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (ret != 0) {
|
|
||||||
// printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return NULL;
|
|
||||||
} else {
|
|
||||||
if (taosKeepTcpAlive(pSocket) == -1) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return pSocket;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t taosKeepTcpAlive(TdSocketPtr pSocket) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
int32_t alive = 1;
|
|
||||||
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
|
|
||||||
// printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifndef __APPLE__
|
|
||||||
// all fails on macosx
|
|
||||||
#ifdef TCP_KEEPCNT
|
|
||||||
int32_t probes = 3;
|
|
||||||
if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
|
|
||||||
// printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef TCP_KEEPIDLE
|
|
||||||
int32_t alivetime = 10;
|
|
||||||
if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
|
|
||||||
// printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef TCP_KEEPINTVL
|
|
||||||
int32_t interval = 3;
|
|
||||||
if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
|
|
||||||
// printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
#endif // __APPLE__
|
|
||||||
|
|
||||||
int32_t nodelay = 1;
|
|
||||||
if (taosSetSockOpt(pSocket, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
|
|
||||||
// printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct linger linger = {0};
|
|
||||||
linger.l_onoff = 1;
|
|
||||||
linger.l_linger = 3;
|
|
||||||
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
|
|
||||||
// printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int taosGetLocalIp(const char *eth, char *ip) {
|
|
||||||
#if defined(WINDOWS)
|
|
||||||
// DO NOTHAING
|
|
||||||
return -1;
|
|
||||||
#else
|
|
||||||
int fd;
|
|
||||||
struct ifreq ifr;
|
|
||||||
struct sockaddr_in sin;
|
|
||||||
|
|
||||||
fd = socket(AF_INET, SOCK_DGRAM, 0);
|
|
||||||
if (-1 == fd) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
strncpy(ifr.ifr_name, eth, IFNAMSIZ);
|
|
||||||
ifr.ifr_name[IFNAMSIZ - 1] = 0;
|
|
||||||
|
|
||||||
if (ioctl(fd, SIOCGIFADDR, &ifr) < 0) {
|
|
||||||
taosCloseSocketNoCheck1(fd);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
memcpy(&sin, &ifr.ifr_addr, sizeof(sin));
|
|
||||||
taosInetNtoa(sin.sin_addr, ip, 64);
|
|
||||||
taosCloseSocketNoCheck1(fd);
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
int taosValidIp(uint32_t ip) {
|
|
||||||
#if defined(WINDOWS)
|
|
||||||
// DO NOTHAING
|
|
||||||
return -1;
|
|
||||||
#else
|
|
||||||
int ret = -1;
|
|
||||||
int fd;
|
|
||||||
|
|
||||||
struct ifconf ifconf;
|
|
||||||
|
|
||||||
char buf[512] = {0};
|
|
||||||
ifconf.ifc_len = 512;
|
|
||||||
ifconf.ifc_buf = buf;
|
|
||||||
|
|
||||||
if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
ioctl(fd, SIOCGIFCONF, &ifconf);
|
|
||||||
struct ifreq *ifreq = (struct ifreq *)ifconf.ifc_buf;
|
|
||||||
for (int i = (ifconf.ifc_len / sizeof(struct ifreq)); i > 0; i--) {
|
|
||||||
char ip_str[64] = {0};
|
|
||||||
if (ifreq->ifr_flags == AF_INET) {
|
|
||||||
ret = taosGetLocalIp(ifreq->ifr_name, ip_str);
|
|
||||||
if (ret != 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
ret = -1;
|
|
||||||
if (ip == (uint32_t)taosInetAddr(ip_str)) {
|
|
||||||
ret = 0;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
ifreq++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
taosCloseSocketNoCheck1(fd);
|
|
||||||
return ret;
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif // endif 0
|
|
||||||
|
|
||||||
bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
|
bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
|
||||||
struct sockaddr_in serverAdd;
|
struct sockaddr_in serverAdd;
|
||||||
SocketFd fd;
|
SocketFd fd;
|
||||||
|
@ -778,138 +202,19 @@ bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
|
||||||
TAOS_SKIP_ERROR(taosCloseSocket(&pSocket));
|
TAOS_SKIP_ERROR(taosCloseSocket(&pSocket));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* bind socket to server address */
|
/* bind socket to server address */
|
||||||
if (-1 == bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd))) {
|
if (-1 == bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd))) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
TAOS_SKIP_ERROR(taosCloseSocket(&pSocket));
|
TAOS_SKIP_ERROR(taosCloseSocket(&pSocket));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_SKIP_ERROR(taosCloseSocket(&pSocket));
|
TAOS_SKIP_ERROR(taosCloseSocket(&pSocket));
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
TdSocketServerPtr taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
|
|
||||||
struct sockaddr_in serverAdd;
|
|
||||||
SocketFd fd;
|
|
||||||
int32_t reuse;
|
|
||||||
|
|
||||||
// printf("open tcp server socket:0x%x:%hu", ip, port);
|
|
||||||
|
|
||||||
bzero((char *)&serverAdd, sizeof(serverAdd));
|
|
||||||
serverAdd.sin_family = AF_INET;
|
|
||||||
serverAdd.sin_addr.s_addr = ip;
|
|
||||||
serverAdd.sin_port = (uint16_t)htons(port);
|
|
||||||
|
|
||||||
if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
|
|
||||||
// printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
|
|
||||||
taosCloseSocketNoCheck1(fd);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
|
|
||||||
if (pSocket == NULL) {
|
|
||||||
taosCloseSocketNoCheck1(fd);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pSocket->refId = 0;
|
|
||||||
pSocket->fd = fd;
|
|
||||||
|
|
||||||
/* set REUSEADDR option, so the portnumber can be re-used */
|
|
||||||
reuse = 1;
|
|
||||||
if (taosSetSockOpt(pSocket, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
|
|
||||||
// printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* bind socket to server address */
|
|
||||||
if (bind(pSocket->fd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
|
|
||||||
// printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosKeepTcpAlive(pSocket) < 0) {
|
|
||||||
// printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (listen(pSocket->fd, 1024) < 0) {
|
|
||||||
// printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
|
|
||||||
taosCloseSocket(&pSocket);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return (TdSocketServerPtr)pSocket;
|
|
||||||
}
|
|
||||||
|
|
||||||
TdSocketPtr taosAcceptTcpConnectSocket(TdSocketServerPtr pServerSocket, struct sockaddr *destAddr, int *addrLen) {
|
|
||||||
if (pServerSocket == NULL || pServerSocket->fd < 0) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
SocketFd fd = accept(pServerSocket->fd, destAddr, addrLen);
|
|
||||||
if (fd == -1) {
|
|
||||||
// tError("TCP accept failure(%s)", strerror(errno));
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
TdSocketPtr pSocket = (TdSocketPtr)taosMemoryMalloc(sizeof(TdSocket));
|
|
||||||
if (pSocket == NULL) {
|
|
||||||
taosCloseSocketNoCheck1(fd);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pSocket->fd = fd;
|
|
||||||
pSocket->refId = 0;
|
|
||||||
return pSocket;
|
|
||||||
}
|
|
||||||
#define COPY_SIZE 32768
|
|
||||||
// sendfile shall be used
|
|
||||||
|
|
||||||
int64_t taosCopyFds(TdSocketPtr pSrcSocket, TdSocketPtr pDestSocket, int64_t len) {
|
|
||||||
if (pSrcSocket == NULL || pSrcSocket->fd < 0 || pDestSocket == NULL || pDestSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
int64_t leftLen;
|
|
||||||
int64_t readLen, writeLen;
|
|
||||||
char temp[COPY_SIZE];
|
|
||||||
|
|
||||||
leftLen = len;
|
|
||||||
|
|
||||||
while (leftLen > 0) {
|
|
||||||
if (leftLen < COPY_SIZE)
|
|
||||||
readLen = leftLen;
|
|
||||||
else
|
|
||||||
readLen = COPY_SIZE; // 4K
|
|
||||||
|
|
||||||
int64_t retLen = taosReadMsg(pSrcSocket, temp, (int32_t)readLen);
|
|
||||||
if (readLen != retLen) {
|
|
||||||
// printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
|
|
||||||
// readLen, retLen, len, leftLen, strerror(errno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
writeLen = taosWriteMsg(pDestSocket, temp, (int32_t)readLen);
|
|
||||||
|
|
||||||
if (readLen != writeLen) {
|
|
||||||
// printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
|
|
||||||
// readLen, writeLen, len, leftLen, strerror(errno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
leftLen -= readLen;
|
|
||||||
}
|
|
||||||
|
|
||||||
return len;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Function converting an IP address string to an uint32_t.
|
|
||||||
|
|
||||||
#endif // endif 0
|
|
||||||
|
|
||||||
int32_t taosBlockSIGPIPE() {
|
int32_t taosBlockSIGPIPE() {
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1029,7 +334,7 @@ int32_t taosGetFqdn(char *fqdn) {
|
||||||
// hints.ai_family = AF_INET;
|
// hints.ai_family = AF_INET;
|
||||||
strcpy(fqdn, hostname);
|
strcpy(fqdn, hostname);
|
||||||
strcpy(fqdn + strlen(hostname), ".local");
|
strcpy(fqdn + strlen(hostname), ".local");
|
||||||
#else // linux
|
#else // linux
|
||||||
|
|
||||||
#endif // linux
|
#endif // linux
|
||||||
|
|
||||||
|
@ -1048,7 +353,7 @@ int32_t taosGetFqdn(char *fqdn) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(ret);
|
terrno = TAOS_SYSTEM_ERROR(ret);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -1067,7 +372,7 @@ int32_t taosGetFqdn(char *fqdn) {
|
||||||
|
|
||||||
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
|
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
|
||||||
if (!result) {
|
if (!result) {
|
||||||
//fprintf(stderr, "failed to get fqdn, code:%d, hostname:%s, reason:%s\n", ret, hostname, gai_strerror(ret));
|
// fprintf(stderr, "failed to get fqdn, code:%d, hostname:%s, reason:%s\n", ret, hostname, gai_strerror(ret));
|
||||||
return TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError());
|
return TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError());
|
||||||
}
|
}
|
||||||
strcpy(fqdn, result->ai_canonname);
|
strcpy(fqdn, result->ai_canonname);
|
||||||
|
@ -1082,41 +387,16 @@ void tinet_ntoa(char *ipstr, uint32_t ip) {
|
||||||
(void)sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
|
(void)sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosIgnSIGPIPE() {
|
int32_t taosIgnSIGPIPE() {
|
||||||
sighandler_t h = signal(SIGPIPE, SIG_IGN);
|
sighandler_t h = signal(SIGPIPE, SIG_IGN);
|
||||||
if (SIG_ERR == h) {
|
if (SIG_ERR == h) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
|
|
||||||
int32_t taosSetMaskSIGPIPE() {
|
|
||||||
#ifdef WINDOWS
|
|
||||||
return -1;
|
|
||||||
#else
|
|
||||||
sigset_t signal_mask;
|
|
||||||
(void)sigemptyset(&signal_mask);
|
|
||||||
(void)sigaddset(&signal_mask, SIGPIPE);
|
|
||||||
|
|
||||||
int32_t rc = pthread_sigmask(SIG_SETMASK, &signal_mask, NULL);
|
|
||||||
if (rc != 0) {
|
|
||||||
// printf("failed to setmask SIGPIPE");
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, int *addrLen) {
|
|
||||||
if (pSocket == NULL || pSocket->fd < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return getsockname(pSocket->fd, destAddr, addrLen);
|
|
||||||
}
|
|
||||||
#endif // endif 0
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Set TCP connection timeout per-socket level.
|
* Set TCP connection timeout per-socket level.
|
||||||
* ref [https://github.com/libuv/help/issues/54]
|
* ref [https://github.com/libuv/help/issues/54]
|
||||||
|
@ -1132,7 +412,7 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if defined(WINDOWS)
|
#if defined(WINDOWS)
|
||||||
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&timeout, sizeof(timeout))) {
|
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&timeout, sizeof(timeout))) {
|
||||||
taosCloseSocketNoCheck1(fd);
|
taosCloseSocketNoCheck1(fd);
|
||||||
|
|
|
@ -0,0 +1,368 @@
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
import taos
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
from frame.log import *
|
||||||
|
from frame.cases import *
|
||||||
|
from frame.sql import *
|
||||||
|
from frame.caseBase import *
|
||||||
|
from frame import *
|
||||||
|
from frame.srvCtl import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase(TBase):
|
||||||
|
"""
|
||||||
|
Description: This class is used to verify the feature of 'drop table by uid' for task TS-5111
|
||||||
|
FS: https://taosdata.feishu.cn/wiki/JgeDwZkH3iTNv2ksVkWcHenKnTf
|
||||||
|
TS: https://taosdata.feishu.cn/wiki/DX3FwopwGiXCeRkBNXFcj0MBnnb
|
||||||
|
create:
|
||||||
|
2024-09-23 created by Charles
|
||||||
|
update:
|
||||||
|
None
|
||||||
|
"""
|
||||||
|
|
||||||
|
class TableType(Enum):
|
||||||
|
STABLE = 0
|
||||||
|
CHILD_TABLE = 1
|
||||||
|
REGULAR_TABLE = 2
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
"""Initialize the TDengine cluster environment
|
||||||
|
"""
|
||||||
|
super(TDTestCase, self).init(conn, logSql, replicaVar, db="db")
|
||||||
|
tdSql.init(conn.cursor(), logSql)
|
||||||
|
|
||||||
|
def get_uid_by_db_table_name(self, db_name, table_name, table_type=TableType.STABLE):
|
||||||
|
"""Get table uid with db name and table name from system table
|
||||||
|
:param db_name: database name
|
||||||
|
:param table_name: table name
|
||||||
|
:param table_type: table type, default is stable
|
||||||
|
:return: table uid
|
||||||
|
"""
|
||||||
|
if table_type == self.TableType.STABLE:
|
||||||
|
tdSql.query(f"select * from information_schema.ins_stables where db_name='{db_name}' and stable_name like '%{table_name}%';")
|
||||||
|
elif table_type == self.TableType.CHILD_TABLE:
|
||||||
|
tdSql.query(f"select * from information_schema.ins_tables where db_name='{db_name}' and table_name like '%{table_name}%' and stable_name is not null;")
|
||||||
|
else:
|
||||||
|
tdSql.query(f"select * from information_schema.ins_tables where db_name='{db_name}' and table_name like '%{table_name}%' and stable_name is null;")
|
||||||
|
# check whether the table uid is empty
|
||||||
|
if len(tdSql.res) == 0:
|
||||||
|
tdLog.debug(f"Can't get table uid with db name: {db_name} and table name: {table_name}")
|
||||||
|
return None
|
||||||
|
# get table uid list
|
||||||
|
if table_type == self.TableType.STABLE:
|
||||||
|
return [item[10] for item in tdSql.res]
|
||||||
|
else:
|
||||||
|
return [item[5] for item in tdSql.res]
|
||||||
|
|
||||||
|
def get_uid_by_db_name(self, db_name, table_type=TableType.STABLE):
|
||||||
|
"""Get table uid with db name and table type from system table
|
||||||
|
:param db_name: database name
|
||||||
|
:param table_type: table type, default is stable
|
||||||
|
:return: table uid list
|
||||||
|
"""
|
||||||
|
if table_type == self.TableType.STABLE:
|
||||||
|
tdSql.query(f"select * from information_schema.ins_stables where db_name='{db_name}';")
|
||||||
|
elif table_type == self.TableType.CHILD_TABLE:
|
||||||
|
tdSql.query(f"select * from information_schema.ins_tables where db_name='{db_name}' and stable_name is not null;")
|
||||||
|
else:
|
||||||
|
tdSql.query(f"select * from information_schema.ins_tables where db_name='{db_name}' and stable_name is null;")
|
||||||
|
# check whether the table uid is empty
|
||||||
|
if len(tdSql.res) == 0:
|
||||||
|
tdLog.debug(f"Can't get table uid with db name: {db_name}")
|
||||||
|
return None
|
||||||
|
# get table uid list
|
||||||
|
if table_type == self.TableType.STABLE:
|
||||||
|
return [item[10] for item in tdSql.res]
|
||||||
|
else:
|
||||||
|
return [item[5] for item in tdSql.res]
|
||||||
|
|
||||||
|
def drop_table_by_uid(self, uid_list, table_type=TableType.STABLE, exist_ops=False):
|
||||||
|
"""Drop the specified tables by uid list
|
||||||
|
:db_name: database name
|
||||||
|
:param uid_list: table uid list to be dropped
|
||||||
|
:param exist_ops: whether to use exist option, default is False
|
||||||
|
:return: None
|
||||||
|
"""
|
||||||
|
# check whether the uid list is empty
|
||||||
|
if len(uid_list) == 0:
|
||||||
|
return
|
||||||
|
# drop table by uid
|
||||||
|
if exist_ops and table_type == self.TableType.STABLE:
|
||||||
|
for uid in uid_list:
|
||||||
|
tdSql.execute(f"drop stable with if exists `{uid}`;")
|
||||||
|
else:
|
||||||
|
uids = ','.join(["`" + str(item) + "`" for item in uid_list])
|
||||||
|
tdSql.execute(f"drop table with {uids};")
|
||||||
|
|
||||||
|
def test_drop_single_table_by_uid(self):
|
||||||
|
"""Verify the feature of dropping a single stable/child table/regular table by uid with root user
|
||||||
|
"""
|
||||||
|
db_name = "test_drop_single_table_by_uid"
|
||||||
|
tdLog.info("Start test case: test_drop_single_table_by_uid")
|
||||||
|
# data for case test_drop_single_table_by_uid
|
||||||
|
tdLog.info("Prepare data for test case test_drop_single_table_by_uid")
|
||||||
|
tdSql.execute(f"create database {db_name};")
|
||||||
|
tdSql.execute(f"use {db_name};")
|
||||||
|
# table with normal characters
|
||||||
|
tdSql.execute("create stable st1 (ts timestamp, c1 int) tags (t1 int);")
|
||||||
|
tdSql.execute("create table ct1_1 using st1 tags(1);")
|
||||||
|
tdSql.execute("create table t1 (ts timestamp, c1 int, c2 float);")
|
||||||
|
tdLog.info("Finish preparing data for test case test_drop_single_table_by_uid of normal characters")
|
||||||
|
# get table uid
|
||||||
|
uid_st1 = self.get_uid_by_db_table_name(db_name, "st1")
|
||||||
|
tdLog.debug(f"uid_st1: {uid_st1}")
|
||||||
|
uid_ct1_1 = self.get_uid_by_db_table_name(db_name, "ct1_1", self.TableType.CHILD_TABLE)
|
||||||
|
tdLog.debug(f"uid_ct1_1: {uid_ct1_1}")
|
||||||
|
uid_t1 = self.get_uid_by_db_table_name(db_name, "t1", self.TableType.REGULAR_TABLE)
|
||||||
|
tdLog.debug(f"uid_t1: {uid_t1}")
|
||||||
|
# drop table by uid
|
||||||
|
self.drop_table_by_uid(uid_t1, self.TableType.REGULAR_TABLE)
|
||||||
|
self.drop_table_by_uid(uid_ct1_1, self.TableType.CHILD_TABLE)
|
||||||
|
self.drop_table_by_uid(uid_st1, self.TableType.STABLE, True)
|
||||||
|
|
||||||
|
# table with special characters
|
||||||
|
tdSql.execute("create stable `st2\u00bf\u200bfnn1` (ts timestamp, c1 int) tags (t1 int);")
|
||||||
|
tdSql.execute("create table `ct2_1\u00cf\u00ff` using `st2\u00bf\u200bfnn1` tags(1);")
|
||||||
|
tdSql.execute("create table `t2\u00ef\u00fa` (ts timestamp, c1 int, c2 float);")
|
||||||
|
tdLog.info("Finish preparing data for test case test_drop_single_table_by_uid of special characters")
|
||||||
|
# get table uid
|
||||||
|
uid_st2 = self.get_uid_by_db_table_name(db_name, "st2")
|
||||||
|
tdLog.debug(f"uid_st2: {uid_st2}")
|
||||||
|
uid_ct2_1 = self.get_uid_by_db_table_name(db_name, "ct2_1", self.TableType.CHILD_TABLE)
|
||||||
|
tdLog.debug(f"uid_ct2_1: {uid_ct2_1}")
|
||||||
|
uid_t2 = self.get_uid_by_db_table_name(db_name, "t2", self.TableType.REGULAR_TABLE)
|
||||||
|
tdLog.debug(f"uid_t2: {uid_t2}")
|
||||||
|
# drop table by uid
|
||||||
|
self.drop_table_by_uid(uid_t2, self.TableType.REGULAR_TABLE)
|
||||||
|
self.drop_table_by_uid(uid_ct2_1, self.TableType.CHILD_TABLE)
|
||||||
|
self.drop_table_by_uid(uid_st2, self.TableType.STABLE, True)
|
||||||
|
tdSql.execute(f"drop database {db_name};")
|
||||||
|
tdLog.info("Finish test case: test_drop_single_table_by_uid")
|
||||||
|
|
||||||
|
def test_drop_multiple_tables_by_uid(self):
|
||||||
|
"""Verify the feature of dropping multiple tables by uid with root user
|
||||||
|
"""
|
||||||
|
db_name = "test_drop_multiple_tables_by_uid"
|
||||||
|
table_number = 100
|
||||||
|
tdLog.info("Start test case: test_drop_multiple_tables_by_uid")
|
||||||
|
# data for case test_drop_multiple_tables_by_uid
|
||||||
|
tdLog.info("Prepare data for test case test_drop_multiple_tables_by_uid")
|
||||||
|
tdSql.execute(f"create database {db_name};")
|
||||||
|
tdSql.execute(f"use {db_name};")
|
||||||
|
# table with normal characters
|
||||||
|
for i in range(table_number):
|
||||||
|
tdSql.execute(f"create stable st{i} (ts timestamp, c1 int) tags (t1 int);")
|
||||||
|
tdSql.execute(f"create table ct{i}_{i} using st{i} tags({i+1});")
|
||||||
|
tdSql.execute(f"create table t{i} (ts timestamp, c1 int, c2 float);")
|
||||||
|
tdLog.info("Finish preparing data for test case test_drop_multiple_tables_by_uid of normal characters")
|
||||||
|
# get table uid
|
||||||
|
uid_st = self.get_uid_by_db_table_name(db_name, "st")
|
||||||
|
# tdLog.debug(f"Get multiple stable uid list: {uid_st}")
|
||||||
|
uid_ct = self.get_uid_by_db_table_name(db_name, "ct", self.TableType.CHILD_TABLE)
|
||||||
|
# tdLog.debug(f"Get multiple child table uid list: {uid_ct}")
|
||||||
|
uid_t = self.get_uid_by_db_table_name(db_name, "t", self.TableType.REGULAR_TABLE)
|
||||||
|
# tdLog.debug(f"Get multiple regular table uid list: {uid_t}")
|
||||||
|
# drop table by uid
|
||||||
|
self.drop_table_by_uid(uid_t, self.TableType.REGULAR_TABLE)
|
||||||
|
self.drop_table_by_uid(uid_ct, self.TableType.CHILD_TABLE)
|
||||||
|
self.drop_table_by_uid(uid_st, self.TableType.STABLE, True)
|
||||||
|
|
||||||
|
# table with special characters
|
||||||
|
for i in range(table_number):
|
||||||
|
tdSql.execute(f"create stable `st{i}\u00bf\u200bfnn1` (ts timestamp, c1 int) tags (t1 int);")
|
||||||
|
tdSql.execute(f"create table `ct{i}_{i}\u00cf\u00ff` using `st{i}\u00bf\u200bfnn1` tags(1);")
|
||||||
|
tdSql.execute(f"create table `t{i}\u00ef\u00fa` (ts timestamp, c1 int, c2 float);")
|
||||||
|
# get table uid
|
||||||
|
uid_st = self.get_uid_by_db_table_name(db_name, "st")
|
||||||
|
# tdLog.debug(f"Get multiple stable uid list: {uid_st}")
|
||||||
|
uid_ct = self.get_uid_by_db_table_name(db_name, "ct", self.TableType.CHILD_TABLE)
|
||||||
|
# tdLog.debug(f"Get multiple child table uid list: {uid_ct}")
|
||||||
|
uid_t = self.get_uid_by_db_table_name(db_name, "t", self.TableType.REGULAR_TABLE)
|
||||||
|
# tdLog.debug(f"Get multiple regular table uid list: {uid_t}")
|
||||||
|
# drop table by uid
|
||||||
|
self.drop_table_by_uid(uid_t, self.TableType.REGULAR_TABLE)
|
||||||
|
self.drop_table_by_uid(uid_ct, self.TableType.CHILD_TABLE)
|
||||||
|
self.drop_table_by_uid(uid_st, self.TableType.STABLE, True)
|
||||||
|
tdSql.execute(f"drop database {db_name};")
|
||||||
|
tdLog.info("Finish test case: test_drop_multiple_tables_by_uid")
|
||||||
|
|
||||||
|
def test_uid_as_table_name(self):
|
||||||
|
"""Verify using uid as table name, drop table with uid doesn't affect other tables
|
||||||
|
"""
|
||||||
|
db_name = "test_uid_as_table_name"
|
||||||
|
tdLog.info("Start test case: test_uid_as_table_name")
|
||||||
|
# data for case test_uid_as_table_name
|
||||||
|
tdLog.info("Prepare data for test case test_uid_as_table_name")
|
||||||
|
tdSql.execute(f"create database {db_name};")
|
||||||
|
tdSql.execute(f"use {db_name};")
|
||||||
|
# super table
|
||||||
|
tdSql.execute(f"create stable `st1\u00bf\u200bfnn1` (ts timestamp, c1 int) tags (t1 int);")
|
||||||
|
uid_st = self.get_uid_by_db_table_name(db_name, "st")
|
||||||
|
tdSql.execute(f"create stable `{uid_st[0]}` (ts timestamp, c1 int) tags (t1 int);")
|
||||||
|
self.drop_table_by_uid(uid_st, self.TableType.STABLE, True)
|
||||||
|
uid_st = self.get_uid_by_db_table_name(db_name, str(uid_st[0]))
|
||||||
|
assert uid_st is not None
|
||||||
|
tdLog.info(f"Drop stable with special characters with uid {uid_st[0]}, stable named as {uid_st[0]} doesn't be affected")
|
||||||
|
# child table
|
||||||
|
tdSql.execute(f"create stable `st2\u00bf\u200bfnn1` (ts timestamp, c1 int) tags (t1 int);")
|
||||||
|
tdSql.execute(f"create table `ct2_1\u00cf\u00ff` using `st2\u00bf\u200bfnn1` tags(1);")
|
||||||
|
uid_ct = self.get_uid_by_db_table_name(db_name, "ct", self.TableType.CHILD_TABLE)
|
||||||
|
tdSql.execute(f"create table `{uid_ct[0]}` using `st2\u00bf\u200bfnn1` tags(2);")
|
||||||
|
self.drop_table_by_uid(uid_ct, self.TableType.CHILD_TABLE)
|
||||||
|
uid_ct = self.get_uid_by_db_table_name(db_name, str(uid_ct[0]), self.TableType.CHILD_TABLE)
|
||||||
|
assert uid_ct is not None
|
||||||
|
tdLog.info(f"Drop child table with special characters with uid {uid_ct[0]}, child table named as {uid_ct[0]} doesn't be affected")
|
||||||
|
# regular table
|
||||||
|
tdSql.execute(f"create table `t2\u00bf\u200bfnn1` (ts timestamp, c1 int);")
|
||||||
|
uid_t = self.get_uid_by_db_table_name(db_name, "t2", self.TableType.REGULAR_TABLE)
|
||||||
|
tdSql.execute(f"create table `{uid_t[0]}` (ts timestamp, c1 int);")
|
||||||
|
self.drop_table_by_uid(uid_t, self.TableType.REGULAR_TABLE)
|
||||||
|
uid_t = self.get_uid_by_db_table_name(db_name, str(uid_t[0]), self.TableType.REGULAR_TABLE)
|
||||||
|
assert uid_t is not None
|
||||||
|
tdLog.info(f"Drop regular table with special characters with uid {uid_t[0]}, regular table named as {uid_t[0]} doesn't be affected")
|
||||||
|
tdSql.execute(f"drop database {db_name};")
|
||||||
|
tdLog.info("Finish test case: test_uid_as_table_name")
|
||||||
|
|
||||||
|
def test_abnormal_non_exist_uid(self):
|
||||||
|
"""Verify dropping table with non-exist uid
|
||||||
|
"""
|
||||||
|
db_name = "test_abnormal_non_exist_uid"
|
||||||
|
tdLog.info("Start test case: test_abnormal_non_exist_uid")
|
||||||
|
# data for case test_abnormal_non_exist_uid
|
||||||
|
tdLog.info("Prepare data for test case test_abnormal_non_exist_uid")
|
||||||
|
tdSql.execute(f"create database {db_name};")
|
||||||
|
tdSql.execute(f"use {db_name};")
|
||||||
|
# drop table with non-exist uid
|
||||||
|
tdSql.error(f"drop stable with if exists `1234567890`;", expectErrInfo="STable not exist:")
|
||||||
|
tdSql.error(f"drop table with `1234567890`;", expectErrInfo="Table does not exist:")
|
||||||
|
tdSql.execute(f"drop database {db_name};")
|
||||||
|
tdLog.info("Finish test case: test_abnormal_non_exist_uid")
|
||||||
|
|
||||||
|
def test_abnormal_incorrect_table_type(self):
|
||||||
|
"""Verify dropping table with incorrect sql, like drop stable sql with table or child table uid
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
db_name = "test_abnormal_incorrect_table_type"
|
||||||
|
tdLog.info("Start test case: test_abnormal_incorrect_table_type")
|
||||||
|
# data for case test_abnormal_incorrect_table_type
|
||||||
|
tdLog.info("Prepare data for test case test_abnormal_incorrect_table_type")
|
||||||
|
tdSql.execute(f"create database {db_name};")
|
||||||
|
tdSql.execute(f"use {db_name};")
|
||||||
|
tdSql.execute("create stable `st3\u00bf\u200bfnn1` (ts timestamp, c1 int) tags (t1 int);")
|
||||||
|
tdSql.execute("create table `ct3_1\u00cf\u00ff` using `st3\u00bf\u200bfnn1` tags(1);")
|
||||||
|
tdSql.execute("create table `t3\u00ef\u00fa` (ts timestamp, c1 int, c2 float);")
|
||||||
|
tdLog.info("Finish preparing data for test case test_abnormal_incorrect_table_type of special characters")
|
||||||
|
# get table uid
|
||||||
|
uid_st = self.get_uid_by_db_table_name(db_name, "st")
|
||||||
|
uid_ct = self.get_uid_by_db_table_name(db_name, "ct", self.TableType.CHILD_TABLE)
|
||||||
|
uid_t = self.get_uid_by_db_table_name(db_name, "t", self.TableType.REGULAR_TABLE)
|
||||||
|
# drop table with incorrect sql
|
||||||
|
tdSql.error(f"drop stable with `{uid_ct[0]}`;", expectErrInfo="STable not exist")
|
||||||
|
tdSql.error(f"drop stable with `{uid_t[0]}`;", expectErrInfo="STable not exist")
|
||||||
|
tdLog.info("Finish test case: test_abnormal_incorrect_table_type")
|
||||||
|
except Exception as e:
|
||||||
|
tdLog.exit("Failed to run test case test_abnormal_incorrect_table_type with msg: %s" % str(e))
|
||||||
|
finally:
|
||||||
|
tdSql.execute(f"drop database {db_name};")
|
||||||
|
|
||||||
|
def test_abnormal_mixed_uid(self):
|
||||||
|
"""Verify dropping table with mixed uid
|
||||||
|
"""
|
||||||
|
db_name = "test_abnormal_mixed_uid"
|
||||||
|
tdLog.info("Start test case: test_abnormal_mixed_uid")
|
||||||
|
# data for case test_abnormal_mixed_uidF
|
||||||
|
tdLog.info("Prepare data for test case test_abnormal_mixed_uid")
|
||||||
|
tdSql.execute(f"create database {db_name};")
|
||||||
|
tdSql.execute(f"use {db_name};")
|
||||||
|
tdSql.execute("create stable `st3\u00bf\u200bfnn1` (ts timestamp, c1 int) tags (t1 int);")
|
||||||
|
tdSql.execute("create table `ct3_1\u00cf\u00ff` using `st3\u00bf\u200bfnn1` tags(1);")
|
||||||
|
tdSql.execute("create table `t3\u00ef\u00fa` (ts timestamp, c1 int, c2 float);")
|
||||||
|
tdLog.info("Finish preparing data for test case test_abnormal_mixed_uid of special characters")
|
||||||
|
# get table uid
|
||||||
|
uid_st = self.get_uid_by_db_table_name(db_name, "st")
|
||||||
|
uid_ct = self.get_uid_by_db_table_name(db_name, "ct", self.TableType.CHILD_TABLE)
|
||||||
|
uid_t = self.get_uid_by_db_table_name(db_name, "t", self.TableType.REGULAR_TABLE)
|
||||||
|
# drop table with incorrect sql
|
||||||
|
tdSql.error(f"drop stable with `{uid_st[0]}`,`{uid_ct[0]}`;", expectErrInfo="syntax error")
|
||||||
|
tdSql.error(f"drop table with `{uid_st[0]}`,`{uid_ct[0]}`,`{uid_t[0]}`;", expectErrInfo="Cannot drop super table in batch")
|
||||||
|
tdSql.execute(f"drop database {db_name};")
|
||||||
|
tdLog.info("Finish test case: test_abnormal_mixed_uid")
|
||||||
|
|
||||||
|
def test_abnormal_system_tables(self):
|
||||||
|
"""Verify dropping system tables
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
uid_list = self.get_uid_by_db_name("information_schema", self.TableType.REGULAR_TABLE)
|
||||||
|
uid = random.choice(uid_list)
|
||||||
|
assert uid is None
|
||||||
|
except Exception as e:
|
||||||
|
tdLog.exit("Failed to run test case test_abnormal_system_tables with msg: %s" % str(e))
|
||||||
|
|
||||||
|
def test_abnormal_drop_table_with_non_root_user(self):
|
||||||
|
"""Verify dropping table with non-root user
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# create new user and grant create database priviledge
|
||||||
|
tdSql.execute("create user test pass 'test';")
|
||||||
|
tdSql.execute("alter user test createdb 1;")
|
||||||
|
conn = taos.connect(user="test", password="test")
|
||||||
|
cursor = conn.cursor()
|
||||||
|
# create database and tables with new user
|
||||||
|
tdLog.info("Prepare data for test case test_abnormal_drop_table_with_non_root_user")
|
||||||
|
db_name = "test_abnormal_drop_table_with_non_root_user"
|
||||||
|
cursor.execute(f"create database {db_name};")
|
||||||
|
cursor.execute(f"use {db_name};")
|
||||||
|
time.sleep(3)
|
||||||
|
cursor.execute("create stable `st4\u00bf\u200bfnn1` (ts timestamp, c1 int) tags (t1 int);")
|
||||||
|
cursor.execute("create table `ct4_1\u00cf\u00ff` using `st4\u00bf\u200bfnn1` tags(1);")
|
||||||
|
cursor.execute("create table `t4\u00ef\u00fa` (ts timestamp, c1 int, c2 float);")
|
||||||
|
tdLog.info("Finish preparing data for test case test_abnormal_drop_table_with_non_root_user of special characters")
|
||||||
|
# get table uid
|
||||||
|
uid_st = self.get_uid_by_db_table_name(db_name, "st")
|
||||||
|
uid_ct = self.get_uid_by_db_table_name(db_name, "ct", self.TableType.CHILD_TABLE)
|
||||||
|
uid_t = self.get_uid_by_db_table_name(db_name, "t", self.TableType.REGULAR_TABLE)
|
||||||
|
# drop stable with sql by non-root user
|
||||||
|
try:
|
||||||
|
cursor.execute(f"drop stable with `{uid_st[0]}`;")
|
||||||
|
except Exception as e:
|
||||||
|
assert "Permission denied or target object not exist" in str(e)
|
||||||
|
tdLog.info("Drop stable with non-root user failed as expected")
|
||||||
|
# drop child table with sql by non-root user
|
||||||
|
try:
|
||||||
|
cursor.execute(f"drop table with `{uid_ct[0]}`;")
|
||||||
|
except Exception as e:
|
||||||
|
assert "Permission denied or target object not exist" in str(e)
|
||||||
|
tdLog.info("Drop child table with non-root user failed as expected")
|
||||||
|
# drop regular table with sql by non-root user
|
||||||
|
try:
|
||||||
|
cursor.execute(f"drop table with `{uid_t[0]}`;")
|
||||||
|
except Exception as e:
|
||||||
|
assert "Permission denied or target object not exist" in str(e)
|
||||||
|
tdLog.info("Drop regular table with non-root user failed as expected")
|
||||||
|
tdLog.info("Finish test case: test_abnormal_drop_table_with_non_root_user")
|
||||||
|
except Exception as e:
|
||||||
|
tdLog.exit("Failed to run test case test_abnormal_drop_table_with_non_root_user with msg: %s" % str(e))
|
||||||
|
finally:
|
||||||
|
tdSql.execute(f"drop database {db_name};")
|
||||||
|
tdSql.execute("drop user test;")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
# normal cases
|
||||||
|
self.test_drop_single_table_by_uid()
|
||||||
|
self.test_drop_multiple_tables_by_uid()
|
||||||
|
self.test_uid_as_table_name()
|
||||||
|
# abnormal cases
|
||||||
|
self.test_abnormal_non_exist_uid()
|
||||||
|
self.test_abnormal_incorrect_table_type()
|
||||||
|
self.test_abnormal_mixed_uid()
|
||||||
|
self.test_abnormal_system_tables()
|
||||||
|
self.test_abnormal_drop_table_with_non_root_user()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -23,6 +23,7 @@
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f insert/test_column_tag_boundary.py
|
,,y,army,./pytest.sh python3 ./test.py -f insert/test_column_tag_boundary.py
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_desc.py -N 3 -L 3 -D 2
|
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_desc.py -N 3 -L 3 -D 2
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_null.py
|
,,y,army,./pytest.sh python3 ./test.py -f query/fill/fill_null.py
|
||||||
|
,,y,army,./pytest.sh python3 ./test.py -f cluster/test_drop_table_by_uid.py -N 3
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f cluster/incSnapshot.py -N 3
|
,,y,army,./pytest.sh python3 ./test.py -f cluster/incSnapshot.py -N 3
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f cluster/clusterBasic.py -N 5
|
,,y,army,./pytest.sh python3 ./test.py -f cluster/clusterBasic.py -N 5
|
||||||
,,y,army,./pytest.sh python3 ./test.py -f query/query_basic.py -N 3
|
,,y,army,./pytest.sh python3 ./test.py -f query/query_basic.py -N 3
|
||||||
|
|
Loading…
Reference in New Issue