Merge branch 'main' of https://github.com/taosdata/TDengine into fix/ly_res_main

This commit is contained in:
54liuyao 2024-10-29 14:14:44 +08:00
commit d8ff0b63d0
13 changed files with 335 additions and 114 deletions

View File

@ -223,11 +223,11 @@ lossyColumns float|double
| 参数名称 | 参数说明 |
| :--------------: | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------: |
| enableCoreFile | crash 时是否生成 core 文件0: 不生成1生成默认值 为 1; 不同的启动方式,生成 core 文件的目录如下1、systemctl start taosd 启动:生成的 core 在根目录下 <br/> 2、手动启动就在 taosd 执行目录下。 |
| udf | 是否启动 UDF 服务0: 不启动1启动默认值 为 0 |
| ttlChangeOnWrite | ttl 到期时间是否伴随表的修改操作改变; 0: 不改变1改变 ;默认值 为 |
| tmqMaxTopicNum | 订阅最多可建立的 topic 数量; 取值范围 1-10000缺省值 为20 |
| maxTsmaNum | 集群内可创建的TSMA个数取值范围0-3缺省值: 3 |
| enableCoreFile | crash 时是否生成 core 文件0: 不生成1生成默认值为 1; 不同的启动方式,生成 core 文件的目录如下1、systemctl start taosd 启动:生成的 core 在根目录下 <br/> 2、手动启动就在 taosd 执行目录下。 |
| udf | 是否启动 UDF 服务0: 不启动1启动默认值为 0 |
| ttlChangeOnWrite | ttl 到期时间是否伴随表的修改操作改变; 0: 不改变1改变;默认值为 0 |
| tmqMaxTopicNum | 订阅最多可建立的 topic 数量; 取值范围 1-10000缺省值为20 |
| maxTsmaNum | 集群内可创建的TSMA个数取值范围0-3缺省值为 3 |
## taosd 监控指标
@ -458,4 +458,3 @@ TDengine 的日志文件主要包括普通日志和慢日志两种类型。
3. 多个客户端的日志存储在相应日志路径下的同一个 taosSlowLog.yyyy.mm.dd 文件里。
4. 慢日志文件不自动删除,不压缩。
5. 使用和普通日志文件相同的三个参数 logDir, minimalLogDirGB, asyncLog。另外两个参数 numOfLogLineslogKeepDays 不适用于慢日志。

View File

@ -87,7 +87,7 @@ CREATE TABLE [IF NOT EXISTS] USING [db_name.]stb_name (field1_name [, field2_nam
**参数说明**
1. FILE 语法表示数据来自于 CSV 文件英文逗号分隔、英文单引号括住每个值CSV 文件无需表头。CSV 文件中应仅包含 table name 与 tag 值。如需插入数据,请参考数据写入章节。
1. FILE 语法表示数据来自于 CSV 文件英文逗号分隔、英文单引号括住每个值CSV 文件无需表头。CSV 文件中应仅包含 table name 与 tag 值。如需插入数据,请参考'数据写入'章节。
2. 为指定的 stb_name 创建子表,该超级表必须已经存在。
3. field_name 列表顺序与 CSV 文件各列内容顺序一致。列表中不允许出现重复项,且必须包含 `tbname`,可包含零个或多个超级表中已定义的标签列。未包含在列表中的标签值将被设置为 NULL。

View File

@ -47,7 +47,7 @@ INSERT INTO
2. VALUES 语法表示了要插入的一行或多行数据。
3. FILE 语法表示数据来自于 CSV 文件英文逗号分隔、英文单引号括住每个值CSV 文件无需表头。
3. FILE 语法表示数据来自于 CSV 文件英文逗号分隔、英文单引号括住每个值CSV 文件无需表头。如仅需创建子表,请参考'表'章节。
4. `INSERT ... VALUES` 语句和 `INSERT ... FILE` 语句均可以在一条 INSERT 语句中同时向多个表插入数据。
@ -154,12 +154,20 @@ INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) FILE '/tmp/c
INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) FILE '/tmp/csvfile_21001.csv'
d21002 USING meters (groupId) TAGS (2) FILE '/tmp/csvfile_21002.csv';
```
## 超级表语法
## 向超级表插入数据并自动创建子表
自动建表, 表名通过tbname列指定
自动建表, 表名通过 tbname 列指定
```sql
INSERT INTO meters(tbname, location, groupId, ts, current, voltage, phase)
values('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 219, 0.32)
VALUES ('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:34.630', 10.2, 219, 0.32)
('d31001', 'California.SanFrancisco', 2, '2021-07-13 14:06:35.779', 10.15, 217, 0.33)
('d31002', NULL, 2, '2021-07-13 14:06:34.255', 10.15, 217, 0.33)
```
## 通过 CSV 文件向超级表插入数据并自动创建子表
根据 csv 文件内容,为 超级表创建子表,并填充相应 column 与 tag
```sql
INSERT INTO meters(tbname, location, groupId, ts, current, voltage, phase)
FILE '/tmp/csvfile_21002.csv'
```

View File

@ -36,12 +36,13 @@ typedef struct SVnodeMgmt {
SSingleWorker mgmtWorker;
SSingleWorker mgmtMultiWorker;
SHashObj *hash;
SHashObj *closedHash;
TdThreadRwlock lock;
SVnodesStat state;
STfs *pTfs;
TdThread thread;
bool stop;
TdThreadMutex createLock;
TdThreadMutex fileLock;
} SVnodeMgmt;
typedef struct {
@ -94,7 +95,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict);
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal);
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed);
// vmHandle.c
SArray *vmGetMsgHandles();
@ -111,6 +112,7 @@ int32_t vmProcessArbHeartBeatReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt);
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes);
int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes);
// vmWorker.c
int32_t vmStartWorker(SVnodeMgmt *pMgmt);

View File

@ -19,6 +19,54 @@
#define MAX_CONTENT_LEN 2 * 1024 * 1024
int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
(void)taosThreadRwlockRdlock(&pMgmt->lock);
int32_t num = 0;
int32_t size = taosHashGetSize(pMgmt->hash);
int32_t closedSize = taosHashGetSize(pMgmt->closedHash);
size += closedSize;
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
if (pVnodes == NULL) {
(void)taosThreadRwlockUnlock(&pMgmt->lock);
return terrno;
}
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
SVnodeObj *pVnode = *ppVnode;
if (pVnode && num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, acquire vnode list, ref:%d", pVnode->vgId, refCount);
pVnodes[num++] = (*ppVnode);
pIter = taosHashIterate(pMgmt->hash, pIter);
} else {
taosHashCancelIterate(pMgmt->hash, pIter);
}
}
pIter = taosHashIterate(pMgmt->closedHash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
SVnodeObj *pVnode = *ppVnode;
if (pVnode && num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, acquire vnode list, ref:%d", pVnode->vgId, refCount);
pVnodes[num++] = (*ppVnode);
pIter = taosHashIterate(pMgmt->closedHash, pIter);
} else {
taosHashCancelIterate(pMgmt->closedHash, pIter);
}
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
*numOfVnodes = num;
*ppVnodes = pVnodes;
return 0;
}
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
(void)taosThreadRwlockRdlock(&pMgmt->lock);
@ -204,6 +252,7 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
int32_t lino = 0;
int32_t ret = -1;
int32_t nBytes = snprintf(file, sizeof(file), "%s%svnodes_tmp.json", pMgmt->path, TD_DIRSEP);
if (nBytes <= 0 || nBytes >= sizeof(file)) {
@ -216,7 +265,7 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
}
int32_t numOfVnodes = 0;
TAOS_CHECK_GOTO(vmGetVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes), &lino, _OVER);
TAOS_CHECK_GOTO(vmGetAllVnodeListFromHash(pMgmt, &numOfVnodes, &ppVnodes), &lino, _OVER);
// terrno = TSDB_CODE_OUT_OF_MEMORY;
pJson = tjsonCreateObject();
@ -233,35 +282,47 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
goto _OVER;
}
code = taosThreadMutexLock(&pMgmt->fileLock);
if (code != 0) {
lino = __LINE__;
goto _OVER;
}
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pFile == NULL) {
code = terrno;
lino = __LINE__;
goto _OVER;
goto _OVER1;
}
int32_t len = strlen(buffer);
if (taosWriteFile(pFile, buffer, len) <= 0) {
code = terrno;
lino = __LINE__;
goto _OVER;
goto _OVER1;
}
if (taosFsyncFile(pFile) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
lino = __LINE__;
goto _OVER;
goto _OVER1;
}
code = taosCloseFile(&pFile);
if (code != 0) {
code = TAOS_SYSTEM_ERROR(errno);
lino = __LINE__;
goto _OVER;
goto _OVER1;
}
TAOS_CHECK_GOTO(taosRenameFile(file, realfile), &lino, _OVER);
TAOS_CHECK_GOTO(taosRenameFile(file, realfile), &lino, _OVER1);
dInfo("succeed to write vnodes file:%s, vnodes:%d", realfile, numOfVnodes);
_OVER1:
ret = taosThreadMutexUnlock(&pMgmt->fileLock);
if (ret != 0) {
dError("failed to unlock since %s", tstrerror(ret));
}
_OVER:
if (pJson != NULL) tjsonDelete(pJson);
if (buffer != NULL) taosMemoryFree(buffer);

View File

@ -415,24 +415,11 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
goto _OVER;
}
code = taosThreadMutexLock(&pMgmt->createLock);
if (code != 0) {
dError("vgId:%d, failed to lock since %s", req.vgId, tstrerror(code));
goto _OVER;
}
code = vmWriteVnodeListToFile(pMgmt);
if (code != 0) {
code = terrno != 0 ? terrno : code;
int32_t ret = taosThreadMutexUnlock(&pMgmt->createLock);
if (ret != 0) {
dError("vgId:%d, failed to unlock since %s", req.vgId, tstrerror(ret));
}
goto _OVER;
}
int32_t ret = taosThreadMutexUnlock(&pMgmt->createLock);
if (ret != 0) {
dError("vgId:%d, failed to unlock since %s", req.vgId, tstrerror(ret));
}
_OVER:
if (code != 0) {
@ -442,7 +429,8 @@ _OVER:
dError("vgId:%d, failed to lock since %s", req.vgId, tstrerror(r));
}
if (r == 0) {
r = taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
dInfo("vgId:%d, remove from hash", req.vgId);
r = taosHashRemove(pMgmt->hash, &req.vgId, sizeof(int32_t));
if (r != 0) {
dError("vgId:%d, failed to remove vnode since %s", req.vgId, tstrerror(r));
}
@ -550,7 +538,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal);
vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
int32_t diskPrimary = wrapperCfg.diskPrimary;
char path[TSDB_FILENAME_LEN] = {0};
@ -698,7 +686,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
dInfo("vgId:%d, close vnode", srcVgId);
vmCloseVnode(pMgmt, pVnode, true);
vmCloseVnode(pMgmt, pVnode, true, false);
int32_t diskPrimary = wrapperCfg.diskPrimary;
char srcPath[TSDB_FILENAME_LEN] = {0};
@ -807,7 +795,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
bool commitAndRemoveWal = vnodeShouldRemoveWal(pVnode->pImpl);
vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal);
vmCloseVnode(pMgmt, pVnode, commitAndRemoveWal, true);
int32_t diskPrimary = wrapperCfg.diskPrimary;
char path[TSDB_FILENAME_LEN] = {0};
@ -875,7 +863,7 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return code;
}
vmCloseVnode(pMgmt, pVnode, false);
vmCloseVnode(pMgmt, pVnode, false, false);
if (vmWriteVnodeListToFile(pMgmt) != 0) {
dError("vgId:%d, failed to write vnode list since %s", vgId, terrstr());
}

View File

@ -166,16 +166,34 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
(void)taosThreadRwlockWrlock(&pMgmt->lock);
SVnodeObj *pOld = NULL;
int32_t r = taosHashGetDup(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
if (r != 0) {
dError("vgId:%d, failed to get vnode from hash", pVnode->vgId);
}
if (pOld) {
vmFreeVnodeObj(&pOld);
}
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
pOld = NULL;
r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
if (r != 0) {
dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
}
if (pOld) {
vmFreeVnodeObj(&pOld);
}
dInfo("vgId:%d, remove from closedHash", pVnode->vgId);
r = taosHashRemove(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t));
if (r != 0) {
dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
return code;
}
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) {
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, bool keepClosed) {
char path[TSDB_FILENAME_LEN] = {0};
bool atExit = true;
@ -185,7 +203,40 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
(void)taosThreadRwlockWrlock(&pMgmt->lock);
int32_t r = taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
if (r != 0) {
dError("vgId:%d, failed to remove vnode from hash", pVnode->vgId);
}
if (keepClosed) {
SVnodeObj *pClosedVnode = taosMemoryCalloc(1, sizeof(SVnodeObj));
(void)memset(pClosedVnode, 0, sizeof(SVnodeObj));
if (pVnode == NULL) {
dError("vgId:%d, failed to alloc vnode since %s", pVnode->vgId, terrstr());
(void)taosThreadRwlockUnlock(&pMgmt->lock);
return;
}
pClosedVnode->vgId = pVnode->vgId;
pClosedVnode->dropped = pVnode->dropped;
pClosedVnode->vgVersion = pVnode->vgVersion;
pClosedVnode->diskPrimary = pVnode->diskPrimary;
pClosedVnode->toVgId = pVnode->toVgId;
SVnodeObj *pOld = NULL;
r = taosHashGetDup(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), (void *)&pOld);
if (r != 0) {
dError("vgId:%d, failed to get vnode from closedHash", pVnode->vgId);
}
if (pOld) {
vmFreeVnodeObj(&pOld);
}
dInfo("vgId:%d, put vnode to closedHash", pVnode->vgId);
r = taosHashPut(pMgmt->closedHash, &pVnode->vgId, sizeof(int32_t), &pClosedVnode, sizeof(SVnodeObj *));
if (r != 0) {
dError("vgId:%d, failed to put vnode to closedHash", pVnode->vgId);
}
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
vmReleaseVnode(pMgmt, pVnode);
if (pVnode->failed) {
@ -362,9 +413,15 @@ static void *vmOpenVnodeInThread(void *param) {
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (pMgmt->hash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
dError("failed to init vnode hash since %s", terrstr());
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
pMgmt->closedHash =
taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (pMgmt->hash == NULL) {
dError("failed to init vnode closed hash since %s", terrstr());
return TSDB_CODE_OUT_OF_MEMORY;
}
SWrapperCfg *pCfgs = NULL;
@ -459,7 +516,7 @@ static void *vmCloseVnodeInThread(void *param) {
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
tmsgReportStartup("vnode-close", stepDesc);
vmCloseVnode(pMgmt, pVnode, false);
vmCloseVnode(pMgmt, pVnode, false, false);
}
dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
@ -537,6 +594,18 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
pMgmt->hash = NULL;
}
void *pIter = taosHashIterate(pMgmt->closedHash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
vmFreeVnodeObj(ppVnode);
pIter = taosHashIterate(pMgmt->closedHash, pIter);
}
if (pMgmt->closedHash != NULL) {
taosHashCleanup(pMgmt->closedHash);
pMgmt->closedHash = NULL;
}
dInfo("total vnodes:%d are all closed", numOfVnodes);
}
@ -545,7 +614,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
vmStopWorker(pMgmt);
vnodeCleanup();
(void)taosThreadRwlockDestroy(&pMgmt->lock);
(void)taosThreadMutexDestroy(&pMgmt->createLock);
(void)taosThreadMutexDestroy(&pMgmt->fileLock);
taosMemoryFree(pMgmt);
}
@ -637,7 +706,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
goto _OVER;
}
code = taosThreadMutexInit(&pMgmt->createLock, NULL);
code = taosThreadMutexInit(&pMgmt->fileLock, NULL);
if (code != 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _OVER;

View File

@ -400,8 +400,8 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
pSdb->commitTerm = pSdb->applyTerm;
pSdb->commitConfig = pSdb->applyConfig;
memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
mInfo("read sdb file:%s success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, pSdb->commitIndex,
pSdb->commitTerm, pSdb->commitConfig);
mInfo("vgId:1, trans:0, read sdb file:%s success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file,
pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig);
_OVER:
if ((ret = taosCloseFile(&pFile)) != 0) {
@ -573,7 +573,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb, int32_t skip_type) {
pSdb->commitIndex = pSdb->applyIndex;
pSdb->commitTerm = pSdb->applyTerm;
pSdb->commitConfig = pSdb->applyConfig;
mInfo("write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
mInfo("vgId:1, trans:0, write sdb file success, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64
" file:%s",
pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
}
@ -610,8 +611,8 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
if (code != 0) {
mError("failed to write sdb file since %s", tstrerror(code));
} else {
mInfo("write sdb file success, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64, pSdb->applyIndex,
pSdb->applyTerm, pSdb->applyConfig);
mInfo("vgId:1, trans:0, write sdb file success, apply index:%" PRId64 ", term:%" PRId64 ", config:%" PRId64,
pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig);
}
(void)taosThreadMutexUnlock(&pSdb->filelock);
return code;

View File

@ -723,11 +723,9 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
{
SLastCol *pLastCol = NULL;
if (values_list[0] != NULL) {
code = tsdbCacheDeserialize(values_list[0], values_list_sizes[0], &pLastCol);
if (code == TSDB_CODE_INVALID_PARA) {
tsdbTrace("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
tstrerror(code));
} else if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) {
tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
tstrerror(code));
goto _exit;
@ -736,13 +734,12 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
rocksdb_writebatch_delete(wb, keys_list[0], klen);
}
taosMemoryFreeClear(pLastCol);
}
pLastCol = NULL;
if (values_list[1] != NULL) {
code = tsdbCacheDeserialize(values_list[1], values_list_sizes[1], &pLastCol);
if (code == TSDB_CODE_INVALID_PARA) {
tsdbTrace("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
tstrerror(code));
} else if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) {
tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
tstrerror(code));
goto _exit;
@ -751,6 +748,7 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
rocksdb_writebatch_delete(wb, keys_list[1], klen);
}
taosMemoryFreeClear(pLastCol);
}
rocksdb_free(values_list[0]);
rocksdb_free(values_list[1]);
@ -1218,15 +1216,14 @@ static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray
SColVal *pColVal = &updCtx->colVal;
SLastCol *pLastCol = NULL;
if (values_list[i] != NULL) {
code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
if (code == TSDB_CODE_INVALID_PARA) {
tsdbTrace("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
tstrerror(code));
} else if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) {
tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
tstrerror(code));
goto _exit;
}
}
/*
if (code) {
tsdbError("tsdb/cache: vgId:%d, deserialize failed since %s.", TD_VID(pTsdb->pVnode), tstrerror(code));
@ -1692,15 +1689,14 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
continue;
}
if (values_list[i] != NULL) {
code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
if (code == TSDB_CODE_INVALID_PARA) {
tsdbTrace("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
tstrerror(code));
} else if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) {
tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
tstrerror(code));
goto _exit;
}
}
SLastCol *pToFree = pLastCol;
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
if (pLastCol && pLastCol->cacheStatus != TSDB_LAST_CACHE_NO_CACHE) {
@ -1959,15 +1955,14 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < numKeys; ++i) {
SLastCol *pLastCol = NULL;
if (values_list[i] != NULL) {
code = tsdbCacheDeserialize(values_list[i], values_list_sizes[i], &pLastCol);
if (code == TSDB_CODE_INVALID_PARA) {
tsdbTrace("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
tstrerror(code));
} else if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) {
tsdbError("vgId:%d, %s deserialize failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__,
tstrerror(code));
goto _exit;
}
}
SIdxKey *idxKey = taosArrayGet(remainCols, i);
SLastKey *pLastKey = &idxKey->key;
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {

View File

@ -26,7 +26,10 @@ class MonitorTest : public ::testing::Test {
monInit(&cfg);
}
static void TearDownTestSuite() { monCleanup(); }
static void TearDownTestSuite() {
monCleanup();
taosMsleep(100);
}
public:
void SetUp() override {}

View File

@ -23,6 +23,8 @@
#define LOG_MAX_LINE_SIZE (10024)
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
#define LOG_MAX_STACK_LINE_SIZE (512)
#define LOG_MAX_STACK_LINE_BUFFER_SIZE (LOG_MAX_STACK_LINE_SIZE + 3)
#define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024)
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 128)
@ -669,16 +671,40 @@ static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *b
}
}
void taosPrintLog(const char *flags, int32_t level, int32_t dflag, const char *format, ...) {
if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return;
/*
use taosPrintLogImpl_useStackBuffer to avoid stack overflow
char buffer[LOG_MAX_LINE_BUFFER_SIZE];
*/
static int8_t taosPrintLogImplUseStackBuffer(const char *flags, int32_t level, int32_t dflag, const char *format,
va_list args) {
char buffer[LOG_MAX_STACK_LINE_BUFFER_SIZE];
int32_t len = taosBuildLogHead(buffer, flags);
va_list argpointer;
va_start(argpointer, format);
int32_t writeLen = len + vsnprintf(buffer + len, LOG_MAX_LINE_BUFFER_SIZE - len, format, argpointer);
va_end(argpointer);
int32_t writeLen = len + vsnprintf(buffer + len, LOG_MAX_STACK_LINE_BUFFER_SIZE - len - 1, format, args);
if (writeLen > LOG_MAX_STACK_LINE_SIZE) {
return 1;
}
buffer[writeLen++] = '\n';
buffer[writeLen] = 0;
taosPrintLogImp(level, dflag, buffer, writeLen);
if (tsLogFp && level <= DEBUG_INFO) {
buffer[writeLen - 1] = 0;
(*tsLogFp)(taosGetTimestampMs(), level, buffer + len);
}
return 0;
}
static int8_t taosPrintLogImplUseHeapBuffer(const char *flags, int32_t level, int32_t dflag, const char *format,
va_list args) {
char *buffer = taosMemoryCalloc(1, LOG_MAX_LINE_BUFFER_SIZE + 1);
if (buffer == NULL) {
return 1;
}
int32_t len = taosBuildLogHead(buffer, flags);
int32_t writeLen = len + vsnprintf(buffer + len, LOG_MAX_LINE_BUFFER_SIZE - len - 1, format, args);
if (writeLen > LOG_MAX_LINE_SIZE) writeLen = LOG_MAX_LINE_SIZE;
buffer[writeLen++] = '\n';
@ -690,6 +716,22 @@ void taosPrintLog(const char *flags, int32_t level, int32_t dflag, const char *f
buffer[writeLen - 1] = 0;
(*tsLogFp)(taosGetTimestampMs(), level, buffer + len);
}
taosMemoryFree(buffer);
return 0;
}
void taosPrintLog(const char *flags, int32_t level, int32_t dflag, const char *format, ...) {
if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return;
va_list argpointer, argpointer_copy;
va_start(argpointer, format);
va_copy(argpointer_copy, argpointer);
if (taosPrintLogImplUseStackBuffer(flags, level, dflag, format, argpointer) == 0) {
} else {
TAOS_UNUSED(taosPrintLogImplUseHeapBuffer(flags, level, dflag, format, argpointer_copy));
}
va_end(argpointer_copy);
va_end(argpointer);
}
void taosPrintLongString(const char *flags, int32_t level, int32_t dflag, const char *format, ...) {

View File

@ -126,6 +126,13 @@ add_test(
COMMAND regexTest
)
add_executable(logTest "log.cpp")
target_link_libraries(logTest os util common gtest_main)
add_test(
NAME logTest
COMMAND logTest
)
add_executable(decompressTest "decompressTest.cpp")
target_link_libraries(decompressTest os util common gtest_main)
add_test(

46
source/util/test/log.cpp Normal file
View File

@ -0,0 +1,46 @@
#include <gtest/gtest.h>
#include <stdlib.h>
#include <time.h>
#include <random>
#include <tlog.h>
#include <iostream>
using namespace std;
TEST(log, check_log_refactor) {
const char *logDir = "/tmp";
const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10000;
tsAsyncLog = 0;
// idxDebugFlag = 143;
strcpy(tsLogDir, (char *)logDir);
taosInitLog(tsLogDir, 10, false);
tsAsyncLog = 0;
uDebugFlag = 143;
std::string str;
str.push_back('a');
for (int i = 0; i < 10000; i += 2) {
str.push_back('a');
uError("write to file %s", str.c_str());
}
str.clear();
for (int i = 0; i < 10000; i += 2) {
str.push_back('a');
uDebug("write to file %s", str.c_str());
}
for (int i = 0; i < 10000; i += 2) {
str.push_back('a');
uInfo("write to file %s", str.c_str());
}
str.clear();
for (int i = 0; i < 10000; i += 2) {
str.push_back('a');
uTrace("write to file %s", str.c_str());
}
taosCloseLog();
}