Merge branch '3.0' into feature/stream

This commit is contained in:
Liu Jicong 2022-08-04 16:20:55 +08:00
commit 56156ef2b1
45 changed files with 441 additions and 307 deletions

View File

@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG 88d26c3
GIT_TAG 766dcc4
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 58f58ee
GIT_TAG c9cc20f
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taosws-rs
ExternalProject_Add(taosws-rs
GIT_REPOSITORY https://github.com/taosdata/taosws-rs.git
GIT_TAG 24b199e
GIT_TAG 9fa7e2f
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -29,7 +29,7 @@ There are two ways to install taosdump:
1. backing up all databases: specify `-A` or `-all-databases` parameter.
2. backup multiple specified databases: use `-D db1,db2,... ` parameters;
3. back up some super or normal tables in the specified database: use `-dbname stbname1 stbname2 tbname1 tbname2 ... ` parameters. Note that the first parameter of this input sequence is the database name, and only one database is supported. The second and subsequent parameters are the names of super or normal tables in that database, separated by spaces.
3. back up some super or normal tables in the specified database: use `dbname stbname1 stbname2 tbname1 tbname2 ... ` parameters. Note that the first parameter of this input sequence is the database name, and only one database is supported. The second and subsequent parameters are the names of super or normal tables in that database, separated by spaces.
4. back up the system log database: TDengine clusters usually contain a system database named `log`. The data in this database is the data that TDengine runs itself, and the taosdump will not back up the log database by default. If users need to back up the log database, users can use the `-a` or `-allow-sys` command-line parameter.
5. Loose mode backup: taosdump version 1.4.1 onwards provides `-n` and `-L` parameters for backing up data without using escape characters and "loose" mode, which can reduce the number of backups if table names, column names, tag names do not use escape characters. This can also reduce the backup data time and backup data footprint. If you are unsure about using `-n` and `-L` conditions, please use the default parameters for "strict" mode backup. See the [official documentation](/taos-sql/escape) for a description of escaped characters.
@ -104,7 +104,10 @@ Usage: taosdump [OPTION...] dbname [tbname ...]
use letter and number only. Default is NOT.
-n, --no-escape No escape char '`'. Default is using it.
-T, --thread-num=THREAD_NUM Number of thread for dump in file. Default is
5.
8.
-C, --cloud=CLOUD_DSN specify a DSN to access TDengine cloud service
-R, --restful Use RESTful interface to connect TDengine
-t, --timeout=SECONDS The timeout seconds for websocket to interact.
-g, --debug Print debug info.
-?, --help Give this help list
--usage Give a short usage message

View File

@ -16,6 +16,7 @@ The following preparations are required for EMQX to add TDengine data sources co
Depending on the current operating system, users can download the installation package from the [EMQX official website](https://www.emqx.io/downloads) and execute the installation. After installation, use `sudo emqx start` or `sudo systemctl start emqx` to start the EMQX service.
Note: this chapter is based on EMQX v4.4.5. Other version of EMQX probably change its user interface, configuration methods or functions.
## Create Database and Table
@ -31,7 +32,7 @@ Note: The table schema is based on the blog [(In Chinese) Data Transfer, Storage
## Configuring EMQX Rules
Since the configuration interface of EMQX differs from version to version, here is v4.4.3 as an example. For other versions, please refer to the corresponding official documentation.
Since the configuration interface of EMQX differs from version to version, here is v4.4.5 as an example. For other versions, please refer to the corresponding official documentation.
### Login EMQX Dashboard

View File

@ -107,7 +107,10 @@ Usage: taosdump [OPTION...] dbname [tbname ...]
use letter and number only. Default is NOT.
-n, --no-escape No escape char '`'. Default is using it.
-T, --thread-num=THREAD_NUM Number of thread for dump in file. Default is
5.
8.
-C, --cloud=CLOUD_DSN specify a DSN to access TDengine cloud service
-R, --restful Use RESTful interface to connect TDengine
-t, --timeout=SECONDS The timeout seconds for websocket to interact.
-g, --debug Print debug info.
-?, --help Give this help list
--usage Give a short usage message

View File

@ -17,6 +17,7 @@ MQTT 是流行的物联网数据传输协议,[EMQX](https://github.com/emqx/em
用户可以根据当前的操作系统,到 EMQX 官网下载安装包,并执行安装。下载地址如下:<https://www.emqx.io/zh/downloads>。安装后使用 `sudo emqx start``sudo systemctl start emqx` 启动 EMQX 服务。
注意:本文基于 EMQX v4.4.5 版本,其他版本由于相关配置界面、配置方法以及功能可能随着版本升级有所区别。
## 创建数据库和表
@ -32,7 +33,7 @@ CREATE TABLE sensor_data (ts TIMESTAMP, temperature FLOAT, humidity FLOAT, volum
## 配置 EMQX 规则
由于 EMQX 不同版本配置界面所有不同,这里仅以 v4.4.3 为例,其他版本请参考相应官网文档。
由于 EMQX 不同版本配置界面所有不同,这里仅以 v4.4.5 为例,其他版本请参考相应官网文档。
### 登录 EMQX Dashboard

View File

@ -227,8 +227,7 @@ typedef struct SSubmitBlk {
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block
int16_t padding; // TODO just for padding here
int32_t numOfRows; // total number of rows in current submit block
char data[];
} SSubmitBlk;
@ -256,7 +255,7 @@ typedef struct {
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block
int32_t numOfRows; // total number of rows in current submit block
// head of SSubmitBlk
int32_t numOfBlocks;
const void* pMsg;

View File

@ -49,6 +49,8 @@ void osDefaultInit();
void osUpdate();
void osCleanup();
bool osLogSpaceAvailable();
bool osDataSpaceAvailable();
bool osTempSpaceAvailable();
void osSetTimezone(const char *timezone);
void osSetSystemLocale(const char *inLocale, const char *inCharSet);

View File

@ -76,7 +76,7 @@ void taos_cleanup(void) {
cleanupTaskQueue();
taosConvDestroy();
tscInfo("all local resources released");
taosCleanupCfg();
taosCloseLog();
@ -680,7 +680,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
pRequest->stableQuery = pQuery->stableQuery;
if (pQuery->pRoot) {
pRequest->stmtType = pQuery->pRoot->type;
pRequest->stmtType = pQuery->pRoot->type;
}
}
@ -785,9 +785,9 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
if (NULL == pQuery->pRoot) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
}
}
@ -809,6 +809,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, &catalogReq, retrieveMetaCallback, pWrapper,
&pRequest->body.queryJob);
pCxt = NULL;
if (code == TSDB_CODE_SUCCESS) {
return;
}
@ -816,6 +817,8 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
_error:
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId);
taosMemoryFree(pCxt);
terrno = code;
pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
@ -857,7 +860,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
}
pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);

View File

@ -30,6 +30,10 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
setErrno(pRequest, code);
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pRequest->pTscObj, pRequest->targetTableList);
}
taosMemoryFree(pMsg->pData);
if (pRequest->body.queryFp != NULL) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code);

View File

@ -3144,10 +3144,9 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
blk->uid = htobe64(uid);
blk->suid = htobe64(suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(pTableMeta->sversion);
blk->schemaLen = htonl(schemaLen);
blk->numOfRows = htons(rows);
blk->numOfRows = htonl(rows);
blk->dataLen = htonl(dataLen);
subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen;
subReq->numOfBlocks = 1;
@ -3373,10 +3372,9 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
blk->uid = htobe64(uid);
blk->suid = htobe64(suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(pSW->version);
blk->schemaLen = htonl(schemaLen);
blk->numOfRows = htons(rows);
blk->numOfRows = htonl(rows);
blk->dataLen = htonl(dataLen);
subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
subReq->numOfBlocks++;

View File

@ -2028,11 +2028,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
int32_t dataLen = blk->dataLen;
blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen);
blk->numOfRows = htons(blk->numOfRows);
blk->numOfRows = htonl(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen);
}
} else {

View File

@ -452,7 +452,7 @@ static void taosSetClientLogCfg(SConfig *pCfg) {
SConfigItem *pItem = cfgGetItem(pCfg, "logDir");
tstrncpy(tsLogDir, cfgGetItem(pCfg, "logDir")->str, PATH_MAX);
taosExpandDir(tsLogDir, tsLogDir, PATH_MAX);
tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval;
tsLogSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalLogDirGB")->fval) * 1024 * 1024 * 1024);
tsNumOfLogLines = cfgGetItem(pCfg, "numOfLogLines")->i32;
tsAsyncLog = cfgGetItem(pCfg, "asyncLog")->bval;
tsLogKeepDays = cfgGetItem(pCfg, "logKeepDays")->i32;
@ -502,7 +502,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
tsTempSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalTmpDirGB")->fval) * 1024 * 1024 * 1024);
if (taosMulMkDir(tsTempDir) != 0) {
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
return -1;
@ -540,7 +540,7 @@ static void taosSetSystemCfg(SConfig *pCfg) {
}
static int32_t taosSetServerCfg(SConfig *pCfg) {
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
tsDataSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalDataDirGB")->fval) * 1024 * 1024 * 1024);
tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32;
tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32;
tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32;
@ -739,15 +739,15 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
}
case 'i': {
if (strcasecmp("minimalTmpDirGB", name) == 0) {
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
tsTempSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalTmpDirGB")->fval) * 1024 * 1024 * 1024);
} else if (strcasecmp("minimalDataDirGB", name) == 0) {
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
tsDataSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalDataDirGB")->fval) * 1024 * 1024 * 1024);
} else if (strcasecmp("minSlidingTime", name) == 0) {
tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32;
} else if (strcasecmp("minIntervalTime", name) == 0) {
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
} else if (strcasecmp("minimalLogDirGB", name) == 0) {
tsLogSpace.reserved = cfgGetItem(pCfg, "minimalLogDirGB")->fval;
tsLogSpace.reserved = (int64_t)(((double)cfgGetItem(pCfg, "minimalLogDirGB")->fval) * 1024 * 1024 * 1024);
}
break;
}

View File

@ -76,7 +76,7 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
pIter->sversion = htonl((*pPBlock)->sversion);
pIter->dataLen = htonl((*pPBlock)->dataLen);
pIter->schemaLen = htonl((*pPBlock)->schemaLen);
pIter->numOfRows = htons((*pPBlock)->numOfRows);
pIter->numOfRows = htonl((*pPBlock)->numOfRows);
}
return 0;
}

View File

@ -30,6 +30,12 @@ static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);
* @return
*/
STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_TSC_NO_DISKSPACE;
// tscError("tmp file created failed since %s", terrstr());
return NULL;
}
STSBuf* pTSBuf = taosMemoryCalloc(1, sizeof(STSBuf));
if (pTSBuf == NULL) {
return NULL;

View File

@ -176,7 +176,11 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
taosWriteQitem(pVnode->pFetchQ, pMsg);
break;
case WRITE_QUEUE:
if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
if (!osDataSpaceAvailable()) {
terrno = TSDB_CODE_VND_NO_DISKSPACE;
code = terrno;
dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr());
} else if ((pMsg->msgType == TDMT_VND_SUBMIT) && (grantCheck(TSDB_GRANT_STORAGE) != TSDB_CODE_SUCCESS)) {
terrno = TSDB_CODE_VND_NO_WRITE_AUTH;
code = terrno;
dDebug("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr());

View File

@ -49,8 +49,26 @@ static int32_t dmInitMonitor() {
return 0;
}
static bool dmCheckDiskSpace() {
osUpdate();
if (!osDataSpaceAvailable()) {
dError("free disk size: %f GB, too little, require %f GB at least at least , quit", (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0);
return false;
}
if (!osLogSpaceAvailable()) {
dError("free disk size: %f GB, too little, require %f GB at least at least, quit", (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0);
return false;
}
if (!osTempSpaceAvailable()) {
dError("free disk size: %f GB, too little, require %f GB at least at least, quit", (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0);
return false;
}
return true;
}
int32_t dmInit(int8_t rtype) {
dInfo("start to init dnode env");
if (!dmCheckDiskSpace()) return -1;
if (dmCheckRepeatInit(dmInstance()) != 0) return -1;
if (dmInitSystem() != 0) return -1;
if (dmInitMonitor() != 0) return -1;

View File

@ -265,6 +265,7 @@ static void dmWatchNodes(SDnode *pDnode) {
}
int32_t dmRunDnode(SDnode *pDnode) {
int count = 0;
if (dmOpenNodes(pDnode) != 0) {
dError("failed to open nodes since %s", terrstr());
return -1;
@ -274,7 +275,6 @@ int32_t dmRunDnode(SDnode *pDnode) {
dError("failed to start nodes since %s", terrstr());
return -1;
}
while (1) {
if (pDnode->stop) {
dInfo("TDengine is about to stop");
@ -285,6 +285,9 @@ int32_t dmRunDnode(SDnode *pDnode) {
}
dmWatchNodes(pDnode);
if (count == 0) osUpdate();
count %= 10;
count++;
taosMsleep(100);
}
}

View File

@ -531,7 +531,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
}
if (pCreate->numOfColumns < TSDB_MIN_COLUMNS || pCreate->numOfColumns > TSDB_MAX_COLUMNS) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
terrno = TSDB_CODE_PAR_INVALID_COLUMNS_NUM;
return -1;
}
@ -542,7 +542,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
SField *pField = taosArrayGet(pCreate->pColumns, 0);
if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
terrno = TSDB_CODE_PAR_INVALID_FIRST_COLUMN;
return -1;
}

View File

@ -437,7 +437,6 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
pField->bytes = 8;
if (mndCheckCreateStbReq(&createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}

View File

@ -149,7 +149,7 @@ int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWrit
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex);
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
SMnode *pMnode = pFsm->data;
return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
pSnapshot->lastConfigIndex);

View File

@ -117,7 +117,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
blkHead->numOfRows = htons(pDataBlock->info.rows);
blkHead->numOfRows = htonl(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version);
// TODO
blkHead->suid = htobe64(suid);

View File

@ -111,7 +111,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
// pBlock->sversion = htonl(pBlock->sversion);
// pBlock->dataLen = htonl(pBlock->dataLen);
// pBlock->schemaLen = htonl(pBlock->schemaLen);
// pBlock->numOfRows = htons(pBlock->numOfRows);
// pBlock->numOfRows = htonl(pBlock->numOfRows);
#if 0
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {

View File

@ -630,7 +630,7 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool
#ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastApplyIndex);
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);

View File

@ -395,9 +395,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
pBlk->uid = htobe64(tbUid);
pBlk->suid = htobe64(tbUid);
pBlk->sversion = htonl(schemaVer);
pBlk->padding = htonl(0);
pBlk->schemaLen = htonl(0);
pBlk->numOfRows = htons(mockRowNum);
pBlk->numOfRows = htonl(mockRowNum);
pBlk->dataLen = htonl(mockRowNum * mockRowLen);
for (uint32_t r = 0; r < mockRowNum; ++r) {
pRow = (STSRow *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + r * mockRowLen);

File diff suppressed because it is too large Load Diff

View File

@ -225,7 +225,7 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
}
blkHead->dataLen = htonl(dataLen);
blkHead->numOfRows = htons(rows);
blkHead->numOfRows = htonl(rows);
ret->length += sizeof(SSubmitBlk) + dataLen;
blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen);

View File

@ -3398,7 +3398,12 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
uint32_t defaultBufsz = 0;
getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH);
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
qError("Init stream agg supporter failed since %s", terrstr(terrno));
return terrno;
}
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -4662,7 +4667,12 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
if (bufSize <= pageSize) {
bufSize = pageSize * 4;
}
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
qError("Init stream agg supporter failed since %s", terrstr(terrno));
return terrno;
}
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, tsTempDir);
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].pBuf = pSup->pResultBuf;
}

View File

@ -764,7 +764,13 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
uint32_t defaultBufsz = 0;
getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH);
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
pTaskInfo->code = terrno;
qError("Create partition operator info failed since %s", terrstr(terrno));
goto _error;
}
int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}

View File

@ -247,7 +247,12 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_
return NULL;
}
int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, TD_TMP_DIR_PATH);
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
printf("tHash Init failed since %s", terrstr(terrno));
return NULL;
}
int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, tsTempDir);
if (code != 0) {
terrno = code;
return NULL;

View File

@ -159,7 +159,12 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
int32_t start = 0;
if (pHandle->pBuf == NULL) {
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", TD_TMP_DIR_PATH);
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
qError("Add to buf failed since %s", terrstr(terrno));
return terrno;
}
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", tsTempDir);
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -233,7 +238,13 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
} else {
// multi-pass internal merge sort is required
if (pHandle->pBuf == NULL) {
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", TD_TMP_DIR_PATH);
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
code = terrno;
qError("Sort compare init failed since %s", terrstr(terrno));
return code;
}
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", tsTempDir);
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;

View File

@ -1461,15 +1461,14 @@ static int32_t translateInterp(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
}
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, i);
int32_t ret = convertStringToTimestamp(paraType, pValue->datum.p, dbPrec, &timeVal[i - 1]);
int32_t ret = convertStringToTimestamp(paraType, pValue->datum.p, dbPrec, &timeVal[i - 1]);
if (ret != TSDB_CODE_SUCCESS) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
}
if (timeVal[0] > timeVal[1]) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"INTERP function invalid time range");
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "INTERP function invalid time range");
}
}
@ -2136,7 +2135,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "apercentile",
.type = FUNCTION_TYPE_APERCENTILE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateApercentile,
.getEnvFunc = getApercentileFuncEnv,
.initFunc = apercentileFunctionSetup,

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "taoserror.h"
#include "tglobal.h"
#include "tcompare.h"
@ -257,7 +258,14 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
resetSlotInfo(pBucket);
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", TD_TMP_DIR_PATH);
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
// qError("MemBucket create disk based Buf failed since %s", terrstr(terrno));
tMemBucketDestroy(pBucket);
return NULL;
}
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", tsTempDir);
if (ret != 0) {
tMemBucketDestroy(pBucket);
return NULL;

View File

@ -412,22 +412,31 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
msgInfo->code = terrno;
fnError("udfd create shared library failed since %s", terrstr(terrno));
goto _return;
}
char path[PATH_MAX] = {0};
#ifdef WINDOWS
snprintf(path, sizeof(path), "%s%s.dll", TD_TMP_DIR_PATH, pFuncInfo->name);
snprintf(path, sizeof(path), "%s%s.dll", tsTempDir, pFuncInfo->name);
#else
snprintf(path, sizeof(path), "%s/lib%s.so", TD_TMP_DIR_PATH, pFuncInfo->name);
snprintf(path, sizeof(path), "%s/lib%s.so", tsTempDir, pFuncInfo->name);
#endif
TdFilePtr file =
taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
if (file == NULL) {
fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno));
msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
goto _return;
}
int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
if (count != pFuncInfo->codeSize) {
fnError("udfd write udf shared library failed");
msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
goto _return;
}
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
@ -686,7 +695,6 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
buf->len = 0;
}
}
fnDebug("allocate buf. input buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal);
}
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {

View File

@ -122,7 +122,7 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *
pBlocks->sversion = dataBuf->pTableMeta->sversion;
pBlocks->schemaLen = dataBuf->createTbReqLen;
if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
if (pBlocks->numOfRows + numOfRows >= INT32_MAX) {
return TSDB_CODE_TSC_INVALID_OPERATION;
} else {
pBlocks->numOfRows += numOfRows;

View File

@ -292,11 +292,10 @@ static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
int32_t schemaLen = blk->schemaLen;
blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen);
blk->numOfRows = htons(blk->numOfRows);
blk->numOfRows = htonl(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
}
}
@ -1267,7 +1266,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
dataBuf->numOfTables = 1;
@ -1339,7 +1338,7 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767");
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
dataBuf->numOfTables = 1;
@ -1986,7 +1985,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
return TSDB_CODE_SUCCESS;
@ -2074,7 +2073,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
}
@ -2444,7 +2443,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767");
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
}
return TSDB_CODE_SUCCESS;

View File

@ -5845,11 +5845,16 @@ static int32_t createTagValFromExpr(STranslateContext* pCxt, SDataType targetDt,
}
static int32_t createTagValFromVal(STranslateContext* pCxt, SDataType targetDt, SNode* pNode, SValueNode** pVal) {
*pVal = (SValueNode*)nodesCloneNode(pNode);
if (NULL == *pVal) {
SValueNode* pTempVal = (SValueNode*)nodesCloneNode(pNode);
if (NULL == pTempVal) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return DEAL_RES_ERROR == translateValueImpl(pCxt, *pVal, targetDt, true) ? pCxt->errCode : TSDB_CODE_SUCCESS;
if (DEAL_RES_ERROR == translateValueImpl(pCxt, pTempVal, targetDt, true)) {
nodesDestroyNode((SNode*)pTempVal);
return pCxt->errCode;
}
*pVal = pTempVal;
return TSDB_CODE_SUCCESS;
}
static int32_t createTagVal(STranslateContext* pCxt, uint8_t precision, SSchema* pSchema, SNode* pNode,

View File

@ -110,15 +110,15 @@ class InsertTest : public Test {
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
for (int32_t i = 0; i < numOfBlocks; ++i) {
cout << "Block:" << i << endl;
cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", padding:" << ntohl(blk->padding)
<< ", sversion:" << ntohl(blk->sversion) << ", dataLen:" << ntohl(blk->dataLen)
<< ", schemaLen:" << ntohl(blk->schemaLen) << ", numOfRows:" << ntohs(blk->numOfRows) << endl;
cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", sversion:" << ntohl(blk->sversion)
<< ", dataLen:" << ntohl(blk->dataLen) << ", schemaLen:" << ntohl(blk->schemaLen)
<< ", numOfRows:" << ntohl(blk->numOfRows) << endl;
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
}
}
}
void checkReslut(int32_t numOfTables, int16_t numOfRows1, int16_t numOfRows2 = -1) {
void checkReslut(int32_t numOfTables, int32_t numOfRows1, int32_t numOfRows2 = -1) {
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV);
ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
@ -134,7 +134,7 @@ class InsertTest : public Test {
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
for (int32_t i = 0; i < numOfBlocks; ++i) {
ASSERT_EQ(ntohs(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1)));
ASSERT_EQ(ntohl(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1)));
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
}
}

View File

@ -573,7 +573,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// fsync once
SSyncLogStoreData* pData = ths->pLogStore->data;
SWal* pWal = pData->pWal;
walFsync(pWal, true);
walFsync(pWal, false);
// update match index
matchIndex = pMsg->prevLogIndex + pMsg->dataCount;
@ -694,7 +694,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// fsync once
SSyncLogStoreData* pData = ths->pLogStore->data;
SWal* pWal = pData->pWal;
walFsync(pWal, true);
walFsync(pWal, false);
}
// prepare response msg

View File

@ -206,7 +206,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
SWal* pWal = pData->pWal;
SyncIndex index = 0;
SWalSyncInfo syncMeta;
SWalSyncInfo syncMeta = {0};
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;
@ -444,7 +444,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SWal* pWal = pData->pWal;
SyncIndex index = 0;
SWalSyncInfo syncMeta;
SWalSyncInfo syncMeta = {0};
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;

View File

@ -132,8 +132,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) {
// SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
SyncIndex newNextIndex = nextIndex + 1;
SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
// SyncIndex newNextIndex = nextIndex + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
@ -224,8 +225,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) {
// SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
SyncIndex newNextIndex = nextIndex + 1;
SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
// SyncIndex newNextIndex = nextIndex + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64

View File

@ -105,6 +105,10 @@ void osCleanup() {}
bool osLogSpaceAvailable() { return tsLogSpace.reserved <= tsLogSpace.size.avail; }
bool osDataSpaceAvailable() { return tsDataSpace.reserved <= tsDataSpace.size.avail; }
bool osTempSpaceAvailable() { return tsTempSpace.reserved <= tsTempSpace.size.avail; }
void osSetTimezone(const char *timezone) { taosSetSystemTimezone(timezone, tsTimezoneStr, &tsDaylight, &tsTimezone); }
void osSetSystemLocale(const char *inLocale, const char *inCharSet) {

View File

@ -691,9 +691,14 @@ static void taosWriteLog(SLogBuff *pLogBuf) {
static void *taosAsyncOutputLog(void *param) {
SLogBuff *pLogBuf = (SLogBuff *)param;
setThreadName("log");
int32_t count = 0;
while (1) {
count += tsWriteInterval;
taosMsleep(tsWriteInterval);
if (count > 1000) {
osUpdate();
count = 0;
}
// Polling the buffer
taosWriteLog(pLogBuf);

View File

@ -0,0 +1,19 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400