diff --git a/documentation20/webdocs/markdowndocs/administrator-ch.md b/documentation20/webdocs/markdowndocs/administrator-ch.md
index 4b274e05e6..1e36e6c5e6 100644
--- a/documentation20/webdocs/markdowndocs/administrator-ch.md
+++ b/documentation20/webdocs/markdowndocs/administrator-ch.md
@@ -35,7 +35,7 @@ TDengine相对于通用数据库,有超高的压缩比,在绝大多数场景
Raw DataSize = numOfTables * rowSizePerTable * rowsPerTable
```
-示例:1000万台智能电表,每台电表每15分钟采集一次数据,每次采集的数据128字节,那么一年的原始数据量是:10000000\*128\*24\*60/15*365 = 44851T。TDengine大概需要消耗44851/5=8970T, 8.9P空间。
+示例:1000万台智能电表,每台电表每15分钟采集一次数据,每次采集的数据128字节,那么一年的原始数据量是:10000000\*128\*24\*60/15*365 = 44.8512T。TDengine大概需要消耗44.851/5=8.97024T空间。
用户可以通过参数keep,设置数据在磁盘中的最大保存时长。为进一步减少存储成本,TDengine还提供多级存储,最冷的数据可以存放在最廉价的存储介质上,应用的访问不用做任何调整,只是读取速度降低了。
diff --git a/documentation20/webdocs/markdowndocs/faq-ch.md b/documentation20/webdocs/markdowndocs/faq-ch.md
index 80deb889ef..757b6d9929 100644
--- a/documentation20/webdocs/markdowndocs/faq-ch.md
+++ b/documentation20/webdocs/markdowndocs/faq-ch.md
@@ -38,9 +38,9 @@
6. 检查防火墙设置,确认TCP/UDP 端口6030-6042 是打开的
-7. 对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保*libtaos.so*在目录*/usr/local/lib/taos*里, 并且*/usr/local/lib/taos*在系统库函数搜索路径*LD_LIBRARY_PATH*里
+7. 对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保*libtaos.so*在目录*/usr/local/taos/driver*里, 并且*/usr/local/taos/driver*在系统库函数搜索路径*LD_LIBRARY_PATH*里
-8. 对于windows上的JDBC, ODBC, Python, Go等连接,确保*driver/c/taos.dll*在你的系统搜索目录里 (建议*taos.dll*放在目录 *C:\Windows\System32*)
+8. 对于windows上的JDBC, ODBC, Python, Go等连接,确保*C:\TDengine\driver\taos.dll*在你的系统库函数搜索目录里 (建议*taos.dll*放在目录 *C:\Windows\System32*)
9. 如果仍不能排除连接故障,请使用命令行工具nc来分别判断指定端口的TCP和UDP连接是否通畅
检查UDP端口连接是否工作:`nc -vuz {hostIP} {port} `
diff --git a/packaging/tools/post.sh b/packaging/tools/post.sh
index d91daaa5c4..569f316ff3 100755
--- a/packaging/tools/post.sh
+++ b/packaging/tools/post.sh
@@ -121,8 +121,11 @@ function install_config() {
echo -e -n "${GREEN}Enter FQDN:port (like h1.taosdata.com:6030) of an existing TDengine cluster node to join${NC}"
echo
echo -e -n "${GREEN}OR leave it blank to build one${NC}:"
- read firstEp
- while true; do
+ #read firstEp
+ if exec < /dev/tty; then
+ read firstEp;
+ fi
+ while true; do
if [ ! -z "$firstEp" ]; then
# check the format of the firstEp
#if [[ $firstEp == $FQDN_PATTERN ]]; then
diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c
index 7dfd561960..b599be9c13 100644
--- a/src/client/src/tscSQLParser.c
+++ b/src/client/src/tscSQLParser.c
@@ -4427,8 +4427,8 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema* pSchema) {
const char* msg0 = "only support order by primary timestamp";
const char* msg1 = "invalid column name";
- const char* msg2 = "only support order by primary timestamp and queried column";
- const char* msg3 = "only support order by primary timestamp and first tag in groupby clause";
+ const char* msg2 = "only support order by primary timestamp or queried column";
+ const char* msg3 = "only support order by primary timestamp or first tag in groupby clause";
setDefaultOrderInfo(pQueryInfo);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c
index 49d21d9275..40b524488a 100644
--- a/src/common/src/tdataformat.c
+++ b/src/common/src/tdataformat.c
@@ -434,12 +434,12 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols)
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
- ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
ASSERT(target->numOfCols == source->numOfCols);
SDataCols *pTarget = NULL;
if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap
+ ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) {
if (source->cols[j].len > 0) {
@@ -509,6 +509,8 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
(*iter2)++;
if (key1 == key2) (*iter1)++;
}
+
+ ASSERT(target->numOfRows <= target->maxPoints);
}
}
diff --git a/src/dnode/inc/dnodeVWrite.h b/src/dnode/inc/dnodeVWrite.h
index 7da701a8e2..323405143f 100644
--- a/src/dnode/inc/dnodeVWrite.h
+++ b/src/dnode/inc/dnodeVWrite.h
@@ -20,9 +20,12 @@
extern "C" {
#endif
-int32_t dnodeInitVnodeWrite();
-void dnodeCleanupVnodeWrite();
-void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg);
+int32_t dnodeInitVWrite();
+void dnodeCleanupVWrite();
+void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg);
+void * dnodeAllocVWriteQueue(void *pVnode);
+void dnodeFreeVWriteQueue(void *wqueue);
+void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
#ifdef __cplusplus
}
diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c
index c9cb59150c..b4c174e29b 100644
--- a/src/dnode/src/dnodeMain.c
+++ b/src/dnode/src/dnodeMain.c
@@ -62,7 +62,7 @@ static const SDnodeComponent tsDnodeComponents[] = {
{"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
- {"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite},
+ {"vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
{"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead},
{"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite},
{"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer},
diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c
index a7c76a3473..c6fc2b9e36 100644
--- a/src/dnode/src/dnodePeer.c
+++ b/src/dnode/src/dnodePeer.c
@@ -38,10 +38,10 @@ static void *tsDnodeServerRpc = NULL;
static void *tsDnodeClientRpc = NULL;
int32_t dnodeInitServer() {
- dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVnodeWriteQueue;
- dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVnodeWriteQueue;
- dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVnodeWriteQueue;
- dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue;
+ dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue;
+ dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue;
+ dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue;
+ dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToMgmtQueue;
diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c
index 35ea6ae8b3..33cda60793 100644
--- a/src/dnode/src/dnodeShell.c
+++ b/src/dnode/src/dnodeShell.c
@@ -38,10 +38,10 @@ static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0;
int32_t dnodeInitShell() {
- dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue;
+ dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue;
- dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue;
+ dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
// the following message shall be treated as mnode write
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue;
diff --git a/src/dnode/src/dnodeTelemetry.c b/src/dnode/src/dnodeTelemetry.c
index ab58e0470f..e973f9901f 100644
--- a/src/dnode/src/dnodeTelemetry.c
+++ b/src/dnode/src/dnodeTelemetry.c
@@ -268,7 +268,7 @@ static void dnodeGetEmail(char* filepath) {
return;
}
- if (taosTRead(fd, (void *)tsEmail, TSDB_FQDN_LEN) < 0) {
+ if (taosRead(fd, (void *)tsEmail, TSDB_FQDN_LEN) < 0) {
dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
}
diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c
index cd913cc100..abf3cb527d 100644
--- a/src/dnode/src/dnodeVRead.c
+++ b/src/dnode/src/dnodeVRead.c
@@ -132,7 +132,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
}
}
-void *dnodeAllocateVnodeRqueue(void *pVnode) {
+void *dnodeAllocVReadQueue(void *pVnode) {
pthread_mutex_lock(&readPool.mutex);
taos_queue queue = taosOpenQueue();
if (queue == NULL) {
@@ -167,7 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) {
return queue;
}
-void dnodeFreeVnodeRqueue(void *rqueue) {
+void dnodeFreeVReadQueue(void *rqueue) {
taosCloseQueue(rqueue);
// dynamically adjust the number of threads
diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c
index d36c140f43..3a820f180c 100644
--- a/src/dnode/src/dnodeVWrite.c
+++ b/src/dnode/src/dnodeVWrite.c
@@ -15,74 +15,65 @@
#define _DEFAULT_SOURCE
#include "os.h"
-#include "taosmsg.h"
-#include "taoserror.h"
-#include "tutil.h"
+#include "tglobal.h"
#include "tqueue.h"
-#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
-#include "tdataformat.h"
-#include "tglobal.h"
#include "tsync.h"
#include "vnode.h"
-#include "dnodeInt.h"
#include "syncInt.h"
-#include "dnodeVWrite.h"
-#include "dnodeMgmt.h"
+#include "dnodeInt.h"
typedef struct {
- taos_qall qall;
- taos_qset qset; // queue set
- pthread_t thread; // thread
- int32_t workerId; // worker ID
+ taos_qall qall;
+ taos_qset qset; // queue set
+ int32_t workerId; // worker ID
+ pthread_t thread; // thread
} SWriteWorker;
typedef struct {
- SRspRet rspRet;
- int32_t processedCount;
- int32_t code;
- void *pCont;
- int32_t contLen;
- SRpcMsg rpcMsg;
+ SRspRet rspRet;
+ SRpcMsg rpcMsg;
+ int32_t processedCount;
+ int32_t code;
+ int32_t contLen;
+ void * pCont;
} SWriteMsg;
typedef struct {
- int32_t max; // max number of workers
- int32_t nextId; // from 0 to max-1, cyclic
- SWriteWorker *writeWorker;
+ int32_t max; // max number of workers
+ int32_t nextId; // from 0 to max-1, cyclic
+ SWriteWorker *worker;
pthread_mutex_t mutex;
} SWriteWorkerPool;
+static SWriteWorkerPool tsVWriteWP;
static void *dnodeProcessWriteQueue(void *param);
-static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
-SWriteWorkerPool wWorkerPool;
+int32_t dnodeInitVWrite() {
+ tsVWriteWP.max = tsNumOfCores;
+ tsVWriteWP.worker = (SWriteWorker *)tcalloc(sizeof(SWriteWorker), tsVWriteWP.max);
+ if (tsVWriteWP.worker == NULL) return -1;
+ pthread_mutex_init(&tsVWriteWP.mutex, NULL);
-int32_t dnodeInitVnodeWrite() {
- wWorkerPool.max = tsNumOfCores;
- wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
- if (wWorkerPool.writeWorker == NULL) return -1;
- pthread_mutex_init(&wWorkerPool.mutex, NULL);
-
- for (int32_t i = 0; i < wWorkerPool.max; ++i) {
- wWorkerPool.writeWorker[i].workerId = i;
+ for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
+ tsVWriteWP.worker[i].workerId = i;
}
- dInfo("dnode write is initialized, max worker %d", wWorkerPool.max);
+ dInfo("dnode vwrite is initialized, max worker %d", tsVWriteWP.max);
return 0;
}
-void dnodeCleanupVnodeWrite() {
- for (int32_t i = 0; i < wWorkerPool.max; ++i) {
- SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
+void dnodeCleanupVWrite() {
+ for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
+ SWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) {
taosQsetThreadResume(pWorker->qset);
}
}
- for (int32_t i = 0; i < wWorkerPool.max; ++i) {
- SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
+ for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
+ SWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
taosFreeQall(pWorker->qall);
@@ -90,13 +81,13 @@ void dnodeCleanupVnodeWrite() {
}
}
- pthread_mutex_destroy(&wWorkerPool.mutex);
- free(wWorkerPool.writeWorker);
- dInfo("dnode write is closed");
+ pthread_mutex_destroy(&tsVWriteWP.mutex);
+ tfree(tsVWriteWP.worker);
+ dInfo("dnode vwrite is closed");
}
-void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
- char *pCont = (char *)pMsg->pCont;
+void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg) {
+ char *pCont = pMsg->pCont;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
SMsgDesc *pDesc = (SMsgDesc *)pCont;
@@ -111,7 +102,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
taos_queue queue = vnodeAcquireWqueue(pHead->vgId);
if (queue) {
// put message into queue
- SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
+ SWriteMsg *pWrite = taosAllocateQitem(sizeof(SWriteMsg));
pWrite->rpcMsg = *pMsg;
pWrite->pCont = pCont;
pWrite->contLen = pHead->contLen;
@@ -130,12 +121,12 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
}
}
-void *dnodeAllocateVnodeWqueue(void *pVnode) {
- pthread_mutex_lock(&wWorkerPool.mutex);
- SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
+void *dnodeAllocVWriteQueue(void *pVnode) {
+ pthread_mutex_lock(&tsVWriteWP.mutex);
+ SWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId;
void *queue = taosOpenQueue();
if (queue == NULL) {
- pthread_mutex_unlock(&wWorkerPool.mutex);
+ pthread_mutex_unlock(&tsVWriteWP.mutex);
return NULL;
}
@@ -143,7 +134,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) {
taosCloseQueue(queue);
- pthread_mutex_unlock(&wWorkerPool.mutex);
+ pthread_mutex_unlock(&tsVWriteWP.mutex);
return NULL;
}
@@ -152,7 +143,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
if (pWorker->qall == NULL) {
taosCloseQset(pWorker->qset);
taosCloseQueue(queue);
- pthread_mutex_unlock(&wWorkerPool.mutex);
+ pthread_mutex_unlock(&tsVWriteWP.mutex);
return NULL;
}
pthread_attr_t thAttr;
@@ -160,37 +151,35 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
- dError("failed to create thread to process read queue, reason:%s", strerror(errno));
+ dError("failed to create thread to process vwrite queue since %s", strerror(errno));
taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset);
taosCloseQueue(queue);
queue = NULL;
} else {
- dDebug("write worker:%d is launched", pWorker->workerId);
- wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
+ dDebug("dnode vwrite worker:%d is launched", pWorker->workerId);
+ tsVWriteWP.nextId = (tsVWriteWP.nextId + 1) % tsVWriteWP.max;
}
pthread_attr_destroy(&thAttr);
} else {
taosAddIntoQset(pWorker->qset, queue, pVnode);
- wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
+ tsVWriteWP.nextId = (tsVWriteWP.nextId + 1) % tsVWriteWP.max;
}
- pthread_mutex_unlock(&wWorkerPool.mutex);
- dDebug("pVnode:%p, write queue:%p is allocated", pVnode, queue);
+ pthread_mutex_unlock(&tsVWriteWP.mutex);
+ dDebug("pVnode:%p, dnode vwrite queue:%p is allocated", pVnode, queue);
return queue;
}
-void dnodeFreeVnodeWqueue(void *wqueue) {
+void dnodeFreeVWriteQueue(void *wqueue) {
taosCloseQueue(wqueue);
-
- // dynamically adjust the number of threads
}
-void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) {
- SWriteMsg *pWrite = (SWriteMsg *)param;
- if (pWrite == NULL) return;
+void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
+ if (param == NULL) return;
+ SWriteMsg *pWrite = param;
if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
@@ -215,44 +204,45 @@ static void *dnodeProcessWriteQueue(void *param) {
SWriteWorker *pWorker = (SWriteWorker *)param;
SWriteMsg * pWrite;
SWalHead * pHead;
- int32_t numOfMsgs;
- int type;
- void * pVnode, *item;
SRspRet * pRspRet;
+ void * pVnode;
+ void * pItem;
+ int32_t numOfMsgs;
+ int32_t qtype;
- dDebug("write worker:%d is running", pWorker->workerId);
+ dDebug("dnode vwrite worker:%d is running", pWorker->workerId);
while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs == 0) {
- dDebug("qset:%p, dnode write got no message from qset, exiting", pWorker->qset);
+ dDebug("qset:%p, dnode vwrite got no message from qset, exiting", pWorker->qset);
break;
}
for (int32_t i = 0; i < numOfMsgs; ++i) {
pWrite = NULL;
pRspRet = NULL;
- taosGetQitem(pWorker->qall, &type, &item);
- if (type == TAOS_QTYPE_RPC) {
- pWrite = (SWriteMsg *)item;
+ taosGetQitem(pWorker->qall, &qtype, &pItem);
+ if (qtype == TAOS_QTYPE_RPC) {
+ pWrite = pItem;
pRspRet = &pWrite->rspRet;
- pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
+ pHead = (SWalHead *)((char *)pWrite->pCont - sizeof(SWalHead));
pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0;
pHead->len = pWrite->contLen;
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType]);
- } else if (type == TAOS_QTYPE_CQ) {
- pHead = (SWalHead *)((char*)item + sizeof(SSyncHead));
+ } else if (qtype == TAOS_QTYPE_CQ) {
+ pHead = (SWalHead *)((char *)pItem + sizeof(SSyncHead));
dTrace("%p, CQ wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
pHead->version);
} else {
- pHead = (SWalHead *)item;
+ pHead = pItem;
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
pHead->version);
}
- int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
+ int32_t code = vnodeProcessWrite(pVnode, qtype, pHead, pRspRet);
dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType],
pHead->version, tstrerror(code));
@@ -267,17 +257,17 @@ static void *dnodeProcessWriteQueue(void *param) {
// browse all items, and process them one by one
taosResetQitems(pWorker->qall);
for (int32_t i = 0; i < numOfMsgs; ++i) {
- taosGetQitem(pWorker->qall, &type, &item);
- if (type == TAOS_QTYPE_RPC) {
- pWrite = (SWriteMsg *)item;
- dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code);
- } else if (type == TAOS_QTYPE_FWD) {
- pHead = (SWalHead *)item;
+ taosGetQitem(pWorker->qall, &qtype, &pItem);
+ if (qtype == TAOS_QTYPE_RPC) {
+ pWrite = pItem;
+ dnodeSendRpcVWriteRsp(pVnode, pItem, pWrite->rpcMsg.code);
+ } else if (qtype == TAOS_QTYPE_FWD) {
+ pHead = pItem;
vnodeConfirmForward(pVnode, pHead->version, 0);
- taosFreeQitem(item);
+ taosFreeQitem(pItem);
vnodeRelease(pVnode);
} else {
- taosFreeQitem(item);
+ taosFreeQitem(pItem);
vnodeRelease(pVnode);
}
}
@@ -285,19 +275,3 @@ static void *dnodeProcessWriteQueue(void *param) {
return NULL;
}
-
-UNUSED_FUNC
-static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
- int32_t num = taosGetQueueNumber(pWorker->qset);
-
- if (num > 0) {
- usleep(30000);
- sched_yield();
- } else {
- taosFreeQall(pWorker->qall);
- taosCloseQset(pWorker->qset);
- pWorker->qset = NULL;
- dDebug("write worker:%d is released", pWorker->workerId);
- pthread_exit(NULL);
- }
-}
diff --git a/src/inc/dnode.h b/src/inc/dnode.h
index f7ebed31cf..b4973cc672 100644
--- a/src/inc/dnode.h
+++ b/src/inc/dnode.h
@@ -53,11 +53,11 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
-void *dnodeAllocateVnodeWqueue(void *pVnode);
-void dnodeFreeVnodeWqueue(void *queue);
-void *dnodeAllocateVnodeRqueue(void *pVnode);
-void dnodeFreeVnodeRqueue(void *rqueue);
-void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
+void *dnodeAllocVWriteQueue(void *pVnode);
+void dnodeFreeVWriteQueue(void *wqueue);
+void *dnodeAllocVReadQueue(void *pVnode);
+void dnodeFreeVReadQueue(void *rqueue);
+void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue();
void dnodeFreeMnodePqueue();
diff --git a/src/inc/twal.h b/src/inc/twal.h
index 94bdcacfce..3a229ed835 100644
--- a/src/inc/twal.h
+++ b/src/inc/twal.h
@@ -51,6 +51,7 @@ void walCleanUp();
twalh walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(twalh pWal, SWalCfg *pCfg);
+void walStop(twalh);
void walClose(twalh);
int32_t walRenew(twalh);
int32_t walWrite(twalh, SWalHead *);
diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c
index 748b7e7929..9b166f9351 100644
--- a/src/kit/shell/src/shellEngine.c
+++ b/src/kit/shell/src/shellEngine.c
@@ -244,7 +244,7 @@ int32_t shellRunCommand(TAOS* con, char* command) {
}
*p++ = c;
- if (c == ';') {
+ if (c == ';' && quote == 0) {
c = *p;
*p = 0;
if (shellRunSingleCommand(con, cmd) < 0) {
diff --git a/src/os/inc/os.h b/src/os/inc/os.h
index 86e16db8b1..9720499004 100644
--- a/src/os/inc/os.h
+++ b/src/os/inc/os.h
@@ -52,9 +52,10 @@ extern "C" {
#include "osWindows.h"
#endif
+#include "osDef.h"
+#include "osAlloc.h"
#include "osAtomic.h"
#include "osCommon.h"
-#include "osDef.h"
#include "osDir.h"
#include "osFile.h"
#include "osLz4.h"
diff --git a/src/util/inc/talloc.h b/src/os/inc/osAlloc.h
similarity index 86%
rename from src/util/inc/talloc.h
rename to src/os/inc/osAlloc.h
index 1fc4d759b0..2d97017480 100644
--- a/src/util/inc/talloc.h
+++ b/src/os/inc/osAlloc.h
@@ -13,25 +13,23 @@
* along with this program. If not, see .
*/
-#ifndef TDENGINE_UTIL_ALLOC_H
-#define TDENGINE_UTIL_ALLOC_H
+#ifndef TDENGINE_OS_ALLOC_H
+#define TDENGINE_OS_ALLOC_H
#ifdef __cplusplus
extern "C" {
#endif
-#define TSDB_USE_SYS_MEM
-
-#ifdef TSDB_USE_SYS_MEM
+#ifndef TAOS_OS_FUNC_ALLOC
#define tmalloc(size) malloc(size)
- #define tcalloc(size) calloc(1, size)
+ #define tcalloc(nmemb, size) calloc(nmemb, size)
#define trealloc(p, size) realloc(p, size)
#define tmemalign(alignment, size) malloc(size)
#define tfree(p) free(p)
#define tmemzero(p, size) memset(p, 0, size)
#else
void *tmalloc(int32_t size);
- void *tcalloc(int32_t size);
+ void *tcalloc(int32_t nmemb, int32_t size);
void *trealloc(void *p, int32_t size);
void *tmemalign(int32_t alignment, int32_t size);
void tfree(void *p);
diff --git a/src/os/inc/osDarwin.h b/src/os/inc/osDarwin.h
index c1a950fbe6..7bb844831e 100644
--- a/src/os/inc/osDarwin.h
+++ b/src/os/inc/osDarwin.h
@@ -72,8 +72,6 @@ extern "C" {
#include
#define TAOS_OS_FUNC_FILE_SENDIFLE
- #define taosFSendFile(outfile, infile, offset, count) taosFSendFileImp(outfile, infile, offset, size)
- #define taosTSendFile(dfd, sfd, offset, size) taosTSendFileImp(dfd, sfd, offset, size)
#define TAOS_OS_FUNC_SEMPHONE
#define tsem_t dispatch_semaphore_t
diff --git a/src/os/inc/osFile.h b/src/os/inc/osFile.h
index dc19c8177c..62e44d8eb0 100644
--- a/src/os/inc/osFile.h
+++ b/src/os/inc/osFile.h
@@ -20,46 +20,52 @@
extern "C" {
#endif
-ssize_t taosTReadImp(int fd, void *buf, size_t count);
-ssize_t taosTWriteImp(int fd, void *buf, size_t count);
+#define tread(fd, buf, count) read(fd, buf, count)
+#define twrite(fd, buf, count) write(fd, buf, count)
+#define tlseek(fd, offset, whence) lseek(fd, offset, whence)
+#define tclose(fd) \
+ { \
+ if (FD_VALID(fd)) { \
+ close(fd); \
+ fd = FD_INITIALIZER; \
+ } \
+ }
-ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size);
-int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count);
+int64_t taosReadImp(int32_t fd, void *buf, int64_t count);
+int64_t taosWriteImp(int32_t fd, void *buf, int64_t count);
+int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence);
+int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstPath);
-#ifndef TAOS_OS_FUNC_FILE_SENDIFLE
- #define taosTSendFile(dfd, sfd, offset, size) taosTSendFileImp(dfd, sfd, offset, size)
- #define taosFSendFile(outfile, infile, offset, count) taosTSendFileImp(fileno(outfile), fileno(infile), offset, size)
-#endif
+#define taosRead(fd, buf, count) taosReadImp(fd, buf, count)
+#define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count)
+#define taosLSeek(fd, offset, whence) taosLSeekImp(fd, offset, whence)
+#define taosClose(x) tclose(x)
-#define taosTRead(fd, buf, count) taosTReadImp(fd, buf, count)
-#define taosTWrite(fd, buf, count) taosTWriteImp(fd, buf, count)
-#define taosLSeek(fd, offset, whence) lseek(fd, offset, whence)
+// TAOS_OS_FUNC_FILE_SENDIFLE
+int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size);
+int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size);
#ifdef TAOS_RANDOM_FILE_FAIL
- void taosSetRandomFileFailFactor(int factor);
+ void taosSetRandomFileFailFactor(int32_t factor);
void taosSetRandomFileFailOutput(const char *path);
#ifdef TAOS_RANDOM_FILE_FAIL_TEST
- ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line);
- ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line);
- off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, uint32_t line);
- #undef taosTRead
- #undef taosTWrite
+ int64_t taosReadFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line);
+ int64_t taosWriteFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line);
+ int64_t taosLSeekRandomFail(int32_t fd, int64_t offset, int32_t whence, const char *file, uint32_t line);
+ #undef taosRead
+ #undef taosWrite
#undef taosLSeek
- #define taosTRead(fd, buf, count) taosReadFileRandomFail(fd, buf, count, __FILE__, __LINE__)
- #define taosTWrite(fd, buf, count) taosWriteFileRandomFail(fd, buf, count, __FILE__, __LINE__)
+ #define taosRead(fd, buf, count) taosReadFileRandomFail(fd, buf, count, __FILE__, __LINE__)
+ #define taosWrite(fd, buf, count) taosWriteFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosLSeek(fd, offset, whence) taosLSeekRandomFail(fd, offset, whence, __FILE__, __LINE__)
#endif
#endif
-int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstPath);
-
// TAOS_OS_FUNC_FILE_GETTMPFILEPATH
void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath);
-#ifndef TAOS_OS_FUNC_FILE_FTRUNCATE
- #define taosFtruncate ftruncate
-#endif
-
+// TAOS_OS_FUNC_FILE_FTRUNCATE
+int32_t taosFtruncate(int32_t fd, int64_t length);
#ifdef __cplusplus
}
#endif
diff --git a/src/os/inc/osSocket.h b/src/os/inc/osSocket.h
index 0ab3ff0fca..cbfdedef48 100644
--- a/src/os/inc/osSocket.h
+++ b/src/os/inc/osSocket.h
@@ -33,21 +33,19 @@ extern "C" {
x = FD_INITIALIZER; \
} \
}
- typedef int SOCKET;
+ typedef int32_t SOCKET;
#endif
#ifndef TAOS_OS_DEF_EPOLL
#define TAOS_EPOLL_WAIT_TIME -1
#endif
-#define taosClose(x) taosCloseSocket(x)
-
#ifdef TAOS_RANDOM_NETWORK_FAIL
#ifdef TAOS_RANDOM_NETWORK_FAIL_TEST
- ssize_t taosSendRandomFail(int sockfd, const void *buf, size_t len, int flags);
- ssize_t taosSendToRandomFail(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen);
- ssize_t taosReadSocketRandomFail(int fd, void *buf, size_t count);
- ssize_t taosWriteSocketRandomFail(int fd, const void *buf, size_t count);
+ int64_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags);
+ int64_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags, const struct sockaddr *dest_addr, socklen_t addrlen);
+ int64_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count);
+ int64_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count);
#undef taosSend
#undef taosSendto
#undef taosReadSocket
@@ -60,14 +58,14 @@ extern "C" {
#endif
// TAOS_OS_FUNC_SOCKET
-int taosSetNonblocking(SOCKET sock, int on);
-void taosBlockSIGPIPE();
+int32_t taosSetNonblocking(SOCKET sock, int32_t on);
+void taosBlockSIGPIPE();
// TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
-int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen);
+int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen);
// TAOS_OS_FUNC_SOCKET_INET
-uint32_t taosInetAddr(char *ipAddr);
+uint32_t taosInetAddr(char *ipAddr);
const char *taosInetNtoa(struct in_addr ipInt);
#ifdef __cplusplus
diff --git a/src/os/inc/osWindows.h b/src/os/inc/osWindows.h
index dc1da35037..36e30528bf 100644
--- a/src/os/inc/osWindows.h
+++ b/src/os/inc/osWindows.h
@@ -62,11 +62,8 @@ extern "C" {
#define TAOS_OS_FUNC_FILE_ISDIR
#define TAOS_OS_FUNC_FILE_ISLNK
#define TAOS_OS_FUNC_FILE_SENDIFLE
- #define taosFSendFile(outfile, infile, offset, count) taosFSendFileImp(outfile, infile, offset, size)
- #define taosTSendFile(dfd, sfd, offset, size) taosTSendFileImp(dfd, sfd, offset, size)
#define TAOS_OS_FUNC_FILE_GETTMPFILEPATH
-#define TAOS_OS_FUNC_FILE_FTRUNCATE
- extern int taosFtruncate(int fd, int64_t length);
+#define TAOS_OS_FUNC_FILE_FTRUNCATE
#define TAOS_OS_FUNC_MATH
#define SWAP(a, b, c) \
@@ -139,7 +136,6 @@ typedef int (*__compar_fn_t)(const void *, const void *);
#define in_addr_t unsigned long
#define socklen_t int
#define htobe64 htonll
-#define twrite write
#define getpid _getpid
struct tm *localtime_r(const time_t *timep, struct tm *result);
diff --git a/src/os/src/darwin/darwinFile.c b/src/os/src/darwin/darwinFile.c
index 66bdb5b939..dacf4db741 100644
--- a/src/os/src/darwin/darwinFile.c
+++ b/src/os/src/darwin/darwinFile.c
@@ -19,21 +19,19 @@
#define _SEND_FILE_STEP_ 1000
-int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count) {
+int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t count) {
fseek(in_file, (int32_t)(*offset), 0);
- int writeLen = 0;
- uint8_t buffer[_SEND_FILE_STEP_] = { 0 };
+ int writeLen = 0;
+ uint8_t buffer[_SEND_FILE_STEP_] = {0};
for (int len = 0; len < (count - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) {
size_t rlen = fread(buffer, 1, _SEND_FILE_STEP_, in_file);
if (rlen <= 0) {
return writeLen;
- }
- else if (rlen < _SEND_FILE_STEP_) {
+ } else if (rlen < _SEND_FILE_STEP_) {
fwrite(buffer, 1, rlen, out_file);
return (int)(writeLen + rlen);
- }
- else {
+ } else {
fwrite(buffer, 1, _SEND_FILE_STEP_, in_file);
writeLen += _SEND_FILE_STEP_;
}
@@ -44,8 +42,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
size_t rlen = fread(buffer, 1, remain, in_file);
if (rlen <= 0) {
return writeLen;
- }
- else {
+ } else {
fwrite(buffer, 1, remain, out_file);
writeLen += remain;
}
@@ -54,7 +51,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
return writeLen;
}
-ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
- uError("not implemented yet");
+int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t* offset, int64_t size) {
+ uError("taosSendFile not implemented yet");
return -1;
}
\ No newline at end of file
diff --git a/src/util/src/talloc.c b/src/os/src/detail/osAlloc.c
similarity index 88%
rename from src/util/src/talloc.c
rename to src/os/src/detail/osAlloc.c
index d3d8ee8116..4ca35793e7 100644
--- a/src/util/src/talloc.c
+++ b/src/os/src/detail/osAlloc.c
@@ -17,10 +17,10 @@
#include "os.h"
#include "taoserror.h"
#include "tulog.h"
-#include "talloc.h"
+#include "osAlloc.h"
#define TSDB_HAVE_MEMALIGN
-#ifndef TSDB_USE_SYS_MEM
+#ifdef TAOS_OS_FUNC_ALLOC
void *tmalloc(int32_t size) {
void *p = malloc(size);
@@ -32,11 +32,11 @@ void *tmalloc(int32_t size) {
return p;
}
-void *tcalloc(int32_t size) {
- void *p = calloc(1, size);
+void *tcalloc(int32_t nmemb, int32_t size) {
+ void *p = calloc(nmemb, size);
if (p == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
- uError("failed to calloc memory, size:%d reason:%s", size, strerror(errno));
+ uError("failed to calloc memory, nmemb:%d size:%d reason:%s", nmemb, size, strerror(errno));
}
return p;
diff --git a/src/os/src/detail/osFail.c b/src/os/src/detail/osFail.c
index e0eb200851..a99bcd01db 100644
--- a/src/os/src/detail/osFail.c
+++ b/src/os/src/detail/osFail.c
@@ -20,7 +20,7 @@
#ifdef TAOS_RANDOM_NETWORK_FAIL
-ssize_t taosSendRandomFail(int sockfd, const void *buf, size_t len, int flags) {
+int64_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags) {
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
errno = ECONNRESET;
return -1;
@@ -29,8 +29,8 @@ ssize_t taosSendRandomFail(int sockfd, const void *buf, size_t len, int flags) {
return send(sockfd, buf, len, flags);
}
-ssize_t taosSendToRandomFail(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr,
- socklen_t addrlen) {
+int64_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags,
+ const struct sockaddr *dest_addr, socklen_t addrlen) {
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
errno = ECONNRESET;
return -1;
@@ -39,7 +39,7 @@ ssize_t taosSendToRandomFail(int sockfd, const void *buf, size_t len, int flags,
return sendto(sockfd, buf, len, flags, dest_addr, addrlen);
}
-ssize_t taosReadSocketRandomFail(int fd, void *buf, size_t count) {
+int64_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count) {
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
errno = ECONNRESET;
return -1;
@@ -48,7 +48,7 @@ ssize_t taosReadSocketRandomFail(int fd, void *buf, size_t count) {
return read(fd, buf, count);
}
-ssize_t taosWriteSocketRandomFail(int fd, const void *buf, size_t count) {
+int64_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count) {
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
errno = EINTR;
return -1;
@@ -61,10 +61,10 @@ ssize_t taosWriteSocketRandomFail(int fd, const void *buf, size_t count) {
#ifdef TAOS_RANDOM_FILE_FAIL
-static int random_file_fail_factor = 20;
+static int32_t random_file_fail_factor = 20;
static FILE *fpRandomFileFailOutput = NULL;
-void taosSetRandomFileFailFactor(int factor) {
+void taosSetRandomFileFailFactor(int32_t factor) {
random_file_fail_factor = factor;
}
@@ -77,7 +77,7 @@ static void close_random_file_fail_output() {
}
}
-static void random_file_fail_output_sig(int sig) {
+static void random_file_fail_output_sig(int32_t sig) {
fprintf(fpRandomFileFailOutput, "signal %d received.\n", sig);
struct sigaction act = {0};
@@ -105,7 +105,7 @@ void taosSetRandomFileFailOutput(const char *path) {
sigaction(SIGILL, &act, NULL);
}
-ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line) {
+int64_t taosReadFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line) {
if (random_file_fail_factor > 0) {
if (rand() % random_file_fail_factor == 0) {
errno = EIO;
@@ -113,10 +113,10 @@ ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file
}
}
- return taosTReadImp(fd, buf, count);
+ return taosReadImp(fd, buf, count);
}
-ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line) {
+int64_t taosWriteFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line) {
if (random_file_fail_factor > 0) {
if (rand() % random_file_fail_factor == 0) {
errno = EIO;
@@ -124,10 +124,10 @@ ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *fil
}
}
- return taosTWriteImp(fd, buf, count);
+ return taosWriteImp(fd, buf, count);
}
-off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, uint32_t line) {
+int64_t taosLSeekRandomFail(int32_t fd, int64_t offset, int32_t whence, const char *file, uint32_t line) {
if (random_file_fail_factor > 0) {
if (rand() % random_file_fail_factor == 0) {
errno = EIO;
@@ -135,7 +135,7 @@ off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, ui
}
}
- return lseek(fd, offset, whence);
+ return taosLSeekImp(fd, offset, whence);
}
#endif //TAOS_RANDOM_FILE_FAIL
diff --git a/src/os/src/detail/osFile.c b/src/os/src/detail/osFile.c
index e0fa32c7d2..6eb4515f30 100644
--- a/src/os/src/detail/osFile.c
+++ b/src/os/src/detail/osFile.c
@@ -18,6 +18,7 @@
#include "tglobal.h"
#ifndef TAOS_OS_FUNC_FILE_GETTMPFILEPATH
+
void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
const char *tdengineTmpFileNamePrefix = "tdengine-";
@@ -39,10 +40,10 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
taosRandStr(rand, tListLen(rand) - 1);
snprintf(dstPath, PATH_MAX, tmpPath, getpid(), rand);
}
+
#endif
-// rename file name
-int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstPath) {
+int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstPath) {
int32_t ts = taosGetTimestampSec();
char fname[PATH_MAX] = {0}; // max file name length must be less than 255
@@ -51,12 +52,13 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
if (delimiterPos == NULL) return -1;
int32_t fileNameLen = 0;
- if (suffix)
+ if (suffix) {
fileNameLen = snprintf(fname, PATH_MAX, "%s.%d.%s", delimiterPos + 1, ts, suffix);
- else
+ } else {
fileNameLen = snprintf(fname, PATH_MAX, "%s.%d", delimiterPos + 1, ts);
+ }
- size_t len = (size_t)((delimiterPos - fullPath) + fileNameLen + 1);
+ int32_t len = (int32_t)((delimiterPos - fullPath) + fileNameLen + 1);
if (*dstPath == NULL) {
*dstPath = calloc(1, len + 1);
if (*dstPath == NULL) return -1;
@@ -69,9 +71,9 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
return rename(fullPath, *dstPath);
}
-ssize_t taosTReadImp(int fd, void *buf, size_t count) {
- size_t leftbytes = count;
- ssize_t readbytes;
+int64_t taosReadImp(int32_t fd, void *buf, int64_t count) {
+ int64_t leftbytes = count;
+ int64_t readbytes;
char * tbuf = (char *)buf;
while (leftbytes > 0) {
@@ -83,19 +85,19 @@ ssize_t taosTReadImp(int fd, void *buf, size_t count) {
return -1;
}
} else if (readbytes == 0) {
- return (ssize_t)(count - leftbytes);
+ return (int64_t)(count - leftbytes);
}
leftbytes -= readbytes;
tbuf += readbytes;
}
- return (ssize_t)count;
+ return count;
}
-ssize_t taosTWriteImp(int fd, void *buf, size_t n) {
- size_t nleft = n;
- ssize_t nwritten = 0;
+int64_t taosWriteImp(int32_t fd, void *buf, int64_t n) {
+ int64_t nleft = n;
+ int64_t nwritten = 0;
char * tbuf = (char *)buf;
while (nleft > 0) {
@@ -110,13 +112,18 @@ ssize_t taosTWriteImp(int fd, void *buf, size_t n) {
tbuf += nwritten;
}
- return (ssize_t)n;
+ return n;
+}
+
+int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) {
+ return (int64_t)tlseek(fd, (long)offset, whence);
}
#ifndef TAOS_OS_FUNC_FILE_SENDIFLE
-ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
- size_t leftbytes = size;
- ssize_t sentbytes;
+
+int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size) {
+ int64_t leftbytes = size;
+ int64_t sentbytes;
while (leftbytes > 0) {
/*
@@ -131,7 +138,7 @@ ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
return -1;
}
} else if (sentbytes == 0) {
- return (ssize_t)(size - leftbytes);
+ return (int64_t)(size - leftbytes);
}
leftbytes -= sentbytes;
@@ -139,4 +146,17 @@ ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
return size;
}
+
+int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size) {
+ return taosSendFile(fileno(outfile), fileno(infile), offset, size);
+}
+
+#endif
+
+#ifndef TAOS_OS_FUNC_FILE_FTRUNCATE
+
+int32_t taosFtruncate(int32_t fd, int64_t length) {
+ return ftruncate(fd, length);
+}
+
#endif
\ No newline at end of file
diff --git a/src/os/src/detail/osSocket.c b/src/os/src/detail/osSocket.c
index 8a51c389e9..c7c9d77427 100644
--- a/src/os/src/detail/osSocket.c
+++ b/src/os/src/detail/osSocket.c
@@ -19,8 +19,8 @@
#ifndef TAOS_OS_FUNC_SOCKET
-int taosSetNonblocking(SOCKET sock, int on) {
- int flags = 0;
+int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
+ int32_t flags = 0;
if ((flags = fcntl(sock, F_GETFL, 0)) < 0) {
uError("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
return 1;
@@ -43,7 +43,7 @@ void taosBlockSIGPIPE() {
sigset_t signal_mask;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGPIPE);
- int rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
+ int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
if (rc != 0) {
uError("failed to block SIGPIPE");
}
@@ -53,7 +53,7 @@ void taosBlockSIGPIPE() {
#ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
-int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) {
+int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
}
diff --git a/src/os/src/windows/wFile.c b/src/os/src/windows/wFile.c
index fa835ae820..734ed9916d 100644
--- a/src/os/src/windows/wFile.c
+++ b/src/os/src/windows/wFile.c
@@ -43,19 +43,19 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
#define _SEND_FILE_STEP_ 1000
-int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count) {
+int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t count) {
fseek(in_file, (int32_t)(*offset), 0);
- int writeLen = 0;
+ int64_t writeLen = 0;
uint8_t buffer[_SEND_FILE_STEP_] = { 0 };
- for (int len = 0; len < (count - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) {
+ for (int64_t len = 0; len < (count - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) {
size_t rlen = fread(buffer, 1, _SEND_FILE_STEP_, in_file);
if (rlen <= 0) {
return writeLen;
}
else if (rlen < _SEND_FILE_STEP_) {
fwrite(buffer, 1, rlen, out_file);
- return (int)(writeLen + rlen);
+ return (int64_t)(writeLen + rlen);
}
else {
fwrite(buffer, 1, _SEND_FILE_STEP_, in_file);
@@ -63,7 +63,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
}
}
- int remain = count - writeLen;
+ int64_t remain = count - writeLen;
if (remain > 0) {
size_t rlen = fread(buffer, 1, remain, in_file);
if (rlen <= 0) {
@@ -78,12 +78,12 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
return writeLen;
}
-ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
- uError("taosTSendFileImp no implemented yet");
+int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t* offset, int64_t size) {
+ uError("taosSendFile no implemented yet");
return 0;
}
-int taosFtruncate(int fd, int64_t length) {
+int32_t taosFtruncate(int32_t fd, int64_t length) {
uError("taosFtruncate no implemented yet");
return 0;
}
\ No newline at end of file
diff --git a/src/os/src/windows/wSocket.c b/src/os/src/windows/wSocket.c
index da9242d6a3..3b091b2699 100644
--- a/src/os/src/windows/wSocket.c
+++ b/src/os/src/windows/wSocket.c
@@ -34,7 +34,7 @@ void taosWinSocketInit() {
}
}
-int taosSetNonblocking(SOCKET sock, int on) {
+int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
u_long mode;
if (on) {
mode = 1;
@@ -48,7 +48,7 @@ int taosSetNonblocking(SOCKET sock, int on) {
void taosBlockSIGPIPE() {}
-int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) {
+int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
return 0;
}
@@ -72,7 +72,7 @@ int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int op
uint32_t taosInetAddr(char *ipAddr) {
uint32_t value;
- int ret = inet_pton(AF_INET, ipAddr, &value);
+ int32_t ret = inet_pton(AF_INET, ipAddr, &value);
if (ret <= 0) {
return INADDR_NONE;
} else {
diff --git a/src/query/src/qTsbuf.c b/src/query/src/qTsbuf.c
index ad29cef5c2..529da78d9e 100644
--- a/src/query/src/qTsbuf.c
+++ b/src/query/src/qTsbuf.c
@@ -772,7 +772,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
int64_t offset = getDataStartOffset();
int32_t size = (int32_t)pSrcBuf->fileSize - (int32_t)offset;
- ssize_t rc = taosFSendFile(pDestBuf->f, pSrcBuf->f, &offset, size);
+ int64_t rc = taosFSendFile(pDestBuf->f, pSrcBuf->f, &offset, size);
if (rc == -1) {
// tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno));
diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c
index 0137794d18..0fe189db7a 100644
--- a/src/sync/src/syncRetrieve.c
+++ b/src/sync/src/syncRetrieve.c
@@ -149,7 +149,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
int sfd = open(name, O_RDONLY);
if (sfd < 0) break;
- ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
+ ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
close(sfd);
if (ret < 0) break;
@@ -406,7 +406,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
int sfd = open(fname, O_RDONLY);
if (sfd < 0) break;
- code = taosTSendFile(pPeer->syncFd, sfd, NULL, size);
+ code = taosSendFile(pPeer->syncFd, sfd, NULL, size);
close(sfd);
if (code < 0) break;
diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c
index 5633e97cdf..3a5416dd30 100644
--- a/src/tsdb/src/tsdbFile.c
+++ b/src/tsdb/src/tsdbFile.c
@@ -13,10 +13,8 @@
* along with this program. If not, see .
*/
#define _DEFAULT_SOURCE
-#include
-
#define TAOS_RANDOM_FILE_FAIL_TEST
-
+#include
#include "os.h"
#include "talgo.h"
#include "tchecksum.h"
@@ -428,7 +426,7 @@ int tsdbUpdateFileHeader(SFile *pFile) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
- if (taosTWrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
+ if (taosWrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@@ -493,7 +491,7 @@ int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) {
return -1;
}
- if (taosTRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
+ if (taosRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to read file %s header part with %d bytes, reason:%s", pFile->fname, TSDB_FILE_HEAD_SIZE,
strerror(errno));
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c
index a0ff1a6b95..9a7c2db3d3 100644
--- a/src/tsdb/src/tsdbMain.c
+++ b/src/tsdb/src/tsdbMain.c
@@ -582,7 +582,7 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) {
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
- if (taosTWrite(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
+ if (taosWrite(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", pCfg->tsdbId, TSDB_FILE_HEAD_SIZE, fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
@@ -623,7 +623,7 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) {
goto _err;
}
- if (taosTRead(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
+ if (taosRead(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to read %d bytes from file %s since %s", TSDB_FILE_HEAD_SIZE, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c
index 19f59f3080..d68a97eb13 100644
--- a/src/tsdb/src/tsdbMemTable.c
+++ b/src/tsdb/src/tsdbMemTable.c
@@ -322,6 +322,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
memset(pMergeInfo, 0, sizeof(*pMergeInfo));
pMergeInfo->keyFirst = INT64_MAX;
pMergeInfo->keyLast = INT64_MIN;
+ if (pCols) tdResetDataCols(pCols);
row = tsdbNextIterRow(pIter);
if (row == NULL || dataRowKey(row) > maxKey) {
diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c
index c3f92e2bd1..98f815f78a 100644
--- a/src/tsdb/src/tsdbRWHelper.c
+++ b/src/tsdb/src/tsdbRWHelper.c
@@ -14,9 +14,7 @@
*/
#define _DEFAULT_SOURCE
-
#define TAOS_RANDOM_FILE_FAIL_TEST
-
#include "os.h"
#include "talgo.h"
#include "tchecksum.h"
@@ -338,7 +336,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
return -1;
}
- if (taosTSendFile(helperNewLastF(pHelper)->fd, helperLastF(pHelper)->fd, NULL, pCompBlock->len) < pCompBlock->len) {
+ if (taosSendFile(helperNewLastF(pHelper)->fd, helperLastF(pHelper)->fd, NULL, pCompBlock->len) < pCompBlock->len) {
tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo),
helperLastF(pHelper)->fname, helperNewLastF(pHelper)->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
@@ -383,7 +381,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
pIdx->tid = pHelper->tableInfo.tid;
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
- if (taosTWrite(pFile->fd, (void *)(pHelper->pCompInfo), pIdx->len) < (int)pIdx->len) {
+ if (taosWrite(pFile->fd, (void *)(pHelper->pCompInfo), pIdx->len) < (int)pIdx->len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
@@ -435,7 +433,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
ASSERT(offset == pFile->info.size);
- if (taosTWrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < (int)pFile->info.len) {
+ if (taosWrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < (int)pFile->info.len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
@@ -457,7 +455,7 @@ int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffe
return -1;
}
- if (taosTRead(pFile->fd, buffer, len) < len) {
+ if (taosRead(pFile->fd, buffer, len) < len) {
tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, offset, len,
strerror(errno));
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
@@ -554,7 +552,7 @@ int tsdbLoadCompInfoImpl(SFile *pFile, SCompIdx *pIdx, SCompInfo **ppCompInfo) {
return -1;
}
- if (taosTRead(pFile->fd, (void *)(*ppCompInfo), pIdx->len) < (int)pIdx->len) {
+ if (taosRead(pFile->fd, (void *)(*ppCompInfo), pIdx->len) < (int)pIdx->len) {
tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, pIdx->offset, pIdx->len,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
@@ -611,7 +609,7 @@ int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
return -1;
}
- if (taosTRead(pFile->fd, (void *)pHelper->pCompData, tsize) < tsize) {
+ if (taosRead(pFile->fd, (void *)pHelper->pCompData, tsize) < tsize) {
tsdbError("vgId:%d failed to read %" PRIzu " bytes from file %s since %s", REPO_ID(pHelper->pRepo), tsize, pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
@@ -826,7 +824,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
sizeof(TSCKSUM));
// Write the whole block to file
- if (taosTWrite(pFile->fd, (void *)pCompData, lsize) < lsize) {
+ if (taosWrite(pFile->fd, (void *)pCompData, lsize) < lsize) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
@@ -1252,7 +1250,7 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl
return -1;
}
- if (taosTRead(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) {
+ if (taosRead(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompCol->len, pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
@@ -1367,7 +1365,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
- if (taosTRead(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) {
+ if (taosRead(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompBlock->len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
@@ -1495,7 +1493,6 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
ASSERT(pIdx->len > 0);
SCompBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1);
ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
- tdResetDataCols(pDataCols);
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, pDataCols,
NULL, 0, pCfg->update, pMergeInfo);
@@ -1525,7 +1522,6 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
}
} else {
ASSERT(!pHelper->hasOldLastBlock);
- tdResetDataCols(pDataCols);
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0, pCfg->update, pMergeInfo);
ASSERT(pMergeInfo->rowsInserted == pMergeInfo->nOperations && pMergeInfo->nOperations == pDataCols->numOfRows);
@@ -1571,7 +1567,6 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
if ((!TSDB_IS_LAST_BLOCK(&oBlock)) && keyFirst < pCompBlock->keyFirst) {
while (true) {
- tdResetDataCols(pDataCols);
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, oBlock.keyFirst-1, defaultRowsInBlock, pDataCols, NULL, 0,
pCfg->update, pMergeInfo);
ASSERT(pMergeInfo->rowsInserted == pMergeInfo->nOperations && pMergeInfo->nOperations == pDataCols->numOfRows);
diff --git a/src/util/inc/tsocket.h b/src/util/inc/tsocket.h
index f14e8dbb35..391cc44acc 100644
--- a/src/util/inc/tsocket.h
+++ b/src/util/inc/tsocket.h
@@ -20,21 +20,21 @@
extern "C" {
#endif
-int taosReadn(SOCKET sock, char *buffer, int len);
-int taosWriteMsg(SOCKET fd, void *ptr, int nbytes);
-int taosReadMsg(SOCKET fd, void *ptr, int nbytes);
-int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes);
-int taosCopyFds(SOCKET sfd, SOCKET dfd, int64_t len);
-int taosSetNonblocking(SOCKET sock, int on);
+int32_t taosReadn(SOCKET sock, char *buffer, int32_t len);
+int32_t taosWriteMsg(SOCKET fd, void *ptr, int32_t nbytes);
+int32_t taosReadMsg(SOCKET fd, void *ptr, int32_t nbytes);
+int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes);
+int32_t taosCopyFds(SOCKET sfd, SOCKET dfd, int64_t len);
+int32_t taosSetNonblocking(SOCKET sock, int32_t on);
-SOCKET taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
-SOCKET taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
-SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
-int taosKeepTcpAlive(SOCKET sockFd);
+SOCKET taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
+SOCKET taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
+SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
+int32_t taosKeepTcpAlive(SOCKET sockFd);
-int taosGetFqdn(char *);
+int32_t taosGetFqdn(char *);
uint32_t taosGetIpFromFqdn(const char *);
-void tinet_ntoa(char *ipstr, unsigned int ip);
+void tinet_ntoa(char *ipstr, uint32_t ip);
uint32_t ip2uint(const char *const ip_addr);
#ifdef __cplusplus
diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c
index 6ba1d87d92..0806c29ff8 100644
--- a/src/util/src/tkvstore.c
+++ b/src/util/src/tkvstore.c
@@ -14,9 +14,7 @@
*/
#define _DEFAULT_SOURCE
-
#define TAOS_RANDOM_FILE_FAIL_TEST
-
#include "os.h"
#include "hash.h"
#include "taoserror.h"
@@ -188,7 +186,7 @@ int tdKVStoreStartCommit(SKVStore *pStore) {
goto _err;
}
- if (taosTSendFile(pStore->sfd, pStore->fd, NULL, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
+ if (taosSendFile(pStore->sfd, pStore->fd, NULL, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to send file %d bytes since %s", TD_KVSTORE_HEADER_SIZE, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
@@ -248,13 +246,13 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
ASSERT(tlen == POINTER_DISTANCE(pBuf, buf));
ASSERT(tlen == sizeof(SKVRecord));
- if (taosTWrite(pStore->fd, buf, tlen) < tlen) {
+ if (taosWrite(pStore->fd, buf, tlen) < tlen) {
uError("failed to write %d bytes to file %s since %s", tlen, pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
- if (taosTWrite(pStore->fd, cont, contLen) < contLen) {
+ if (taosWrite(pStore->fd, cont, contLen) < contLen) {
uError("failed to write %d bytes to file %s since %s", contLen, pStore->fname, strerror(errno));
return -1;
}
@@ -292,7 +290,7 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) {
void *pBuf = buf;
tdEncodeKVRecord(&pBuf, &rInfo);
- if (taosTWrite(pStore->fd, buf, POINTER_DISTANCE(pBuf, buf)) < POINTER_DISTANCE(pBuf, buf)) {
+ if (taosWrite(pStore->fd, buf, POINTER_DISTANCE(pBuf, buf)) < POINTER_DISTANCE(pBuf, buf)) {
uError("failed to write %" PRId64 " bytes to file %s since %s", (int64_t)(POINTER_DISTANCE(pBuf, buf)), pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@@ -339,7 +337,7 @@ void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size) {
int fd = open(fname, O_RDONLY);
if (fd < 0) goto _err;
- if (taosTRead(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) goto _err;
+ if (taosRead(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) goto _err;
if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) goto _err;
void *pBuf = (void *)buf;
@@ -368,7 +366,7 @@ static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t
return -1;
}
- if (taosTRead(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
+ if (taosRead(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to read %d bytes from file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@@ -402,7 +400,7 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
ASSERT(POINTER_DISTANCE(pBuf, buf) + sizeof(TSCKSUM) <= TD_KVSTORE_HEADER_SIZE);
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE);
- if (taosTWrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
+ if (taosWrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to write %d bytes to file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@@ -535,7 +533,7 @@ static int tdRestoreKVStore(SKVStore *pStore) {
ASSERT(pStore->info.size == TD_KVSTORE_HEADER_SIZE);
while (true) {
- ssize_t tsize = taosTRead(pStore->fd, tbuf, sizeof(SKVRecord));
+ int64_t tsize = taosRead(pStore->fd, tbuf, sizeof(SKVRecord));
if (tsize == 0) break;
if (tsize < sizeof(SKVRecord)) {
uError("failed to read %" PRIzu " bytes from file %s at offset %" PRId64 "since %s", sizeof(SKVRecord), pStore->fname,
@@ -598,7 +596,7 @@ static int tdRestoreKVStore(SKVStore *pStore) {
goto _err;
}
- if (taosTRead(pStore->fd, buf, (size_t)pRecord->size) < pRecord->size) {
+ if (taosRead(pStore->fd, buf, (size_t)pRecord->size) < pRecord->size) {
uError("failed to read %" PRId64 " bytes from file %s since %s, offset %" PRId64, pRecord->size, pStore->fname,
strerror(errno), pRecord->offset);
terrno = TAOS_SYSTEM_ERROR(errno);
diff --git a/src/util/src/tlog.c b/src/util/src/tlog.c
index 09b0933fd6..0ad7917b39 100644
--- a/src/util/src/tlog.c
+++ b/src/util/src/tlog.c
@@ -336,11 +336,11 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
lseek(tsLogObj.logHandle->fd, 0, SEEK_END);
sprintf(name, "==================================================\n");
- taosTWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
+ taosWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
sprintf(name, " new log file \n");
- taosTWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
+ taosWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
sprintf(name, "==================================================\n");
- taosTWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
+ taosWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
return 0;
}
@@ -390,7 +390,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
if (tsAsyncLog) {
taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
} else {
- taosTWrite(tsLogObj.logHandle->fd, buffer, len);
+ taosWrite(tsLogObj.logHandle->fd, buffer, len);
}
if (tsLogObj.maxLines > 0) {
@@ -400,7 +400,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
}
}
- if (dflag & DEBUG_SCREEN) taosTWrite(1, buffer, (uint32_t)len);
+ if (dflag & DEBUG_SCREEN) taosWrite(1, buffer, (uint32_t)len);
}
void taosDumpData(unsigned char *msg, int32_t len) {
@@ -419,7 +419,7 @@ void taosDumpData(unsigned char *msg, int32_t len) {
pos += 3;
if (c >= 16) {
temp[pos++] = '\n';
- taosTWrite(tsLogObj.logHandle->fd, temp, (uint32_t)pos);
+ taosWrite(tsLogObj.logHandle->fd, temp, (uint32_t)pos);
c = 0;
pos = 0;
}
@@ -427,9 +427,7 @@ void taosDumpData(unsigned char *msg, int32_t len) {
temp[pos++] = '\n';
- taosTWrite(tsLogObj.logHandle->fd, temp, (uint32_t)pos);
-
- return;
+ taosWrite(tsLogObj.logHandle->fd, temp, (uint32_t)pos);
}
void taosPrintLongString(const char *flags, int32_t dflag, const char *format, ...) {
@@ -467,7 +465,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
if (tsAsyncLog) {
taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
} else {
- taosTWrite(tsLogObj.logHandle->fd, buffer, len);
+ taosWrite(tsLogObj.logHandle->fd, buffer, len);
}
if (tsLogObj.maxLines > 0) {
@@ -477,7 +475,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
}
}
- if (dflag & DEBUG_SCREEN) taosTWrite(1, buffer, (uint32_t)len);
+ if (dflag & DEBUG_SCREEN) taosWrite(1, buffer, (uint32_t)len);
}
#if 0
@@ -606,7 +604,7 @@ static void *taosAsyncOutputLog(void *param) {
while (1) {
log_size = taosPollLogBuffer(tLogBuff, tempBuffer, TSDB_DEFAULT_LOG_BUF_UNIT);
if (log_size) {
- taosTWrite(tLogBuff->fd, tempBuffer, log_size);
+ taosWrite(tLogBuff->fd, tempBuffer, log_size);
LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + log_size) % LOG_BUF_SIZE(tLogBuff);
} else {
break;
diff --git a/src/util/src/tnote.c b/src/util/src/tnote.c
index 4f05277a84..9536f6fb70 100644
--- a/src/util/src/tnote.c
+++ b/src/util/src/tnote.c
@@ -265,7 +265,7 @@ void taosNotePrint(taosNoteInfo * pNote, const char * const format, ...)
buffer[len] = 0;
if (pNote->taosNoteFd >= 0) {
- taosTWrite(pNote->taosNoteFd, buffer, (unsigned int)len);
+ taosWrite(pNote->taosNoteFd, buffer, (unsigned int)len);
if (pNote->taosNoteMaxLines > 0) {
pNote->taosNoteLines++;
diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c
index 4cf73e6dff..1003bc6178 100644
--- a/src/util/src/tsocket.c
+++ b/src/util/src/tsocket.c
@@ -18,7 +18,7 @@
#include "tsocket.h"
#include "taoserror.h"
-int taosGetFqdn(char *fqdn) {
+int32_t taosGetFqdn(char *fqdn) {
char hostname[1024];
hostname[1023] = '\0';
if (gethostname(hostname, 1023) == -1) {
@@ -26,10 +26,10 @@ int taosGetFqdn(char *fqdn) {
return -1;
}
- struct addrinfo hints = {0};
+ struct addrinfo hints = {0};
struct addrinfo *result = NULL;
hints.ai_flags = AI_CANONNAME;
- int ret = getaddrinfo(hostname, NULL, &hints, &result);
+ int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) {
uError("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
return -1;
@@ -49,10 +49,10 @@ uint32_t taosGetIpFromFqdn(const char *fqdn) {
int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
if (result) {
- struct sockaddr *sa = result->ai_addr;
- struct sockaddr_in *si = (struct sockaddr_in*)sa;
- struct in_addr ia = si->sin_addr;
- uint32_t ip = ia.s_addr;
+ struct sockaddr * sa = result->ai_addr;
+ struct sockaddr_in *si = (struct sockaddr_in *)sa;
+ struct in_addr ia = si->sin_addr;
+ uint32_t ip = ia.s_addr;
freeaddrinfo(result);
return ip;
} else {
@@ -70,7 +70,7 @@ uint32_t taosGetIpFromFqdn(const char *fqdn) {
}
}
-// Function converting an IP address string to an unsigned int.
+// Function converting an IP address string to an uint32_t.
uint32_t ip2uint(const char *const ip_addr) {
char ip_addr_cpy[20];
char ip[5];
@@ -81,7 +81,7 @@ uint32_t ip2uint(const char *const ip_addr) {
s_start = ip_addr_cpy;
s_end = ip_addr_cpy;
- int k;
+ int32_t k;
for (k = 0; *s_start != '\0'; s_start = s_end) {
for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
@@ -95,17 +95,17 @@ uint32_t ip2uint(const char *const ip_addr) {
ip[k] = '\0';
- return *((unsigned int *)ip);
+ return *((uint32_t *)ip);
}
-int taosWriteMsg(SOCKET fd, void *buf, int nbytes) {
- int nleft, nwritten;
- char *ptr = (char *)buf;
+int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
+ int32_t nleft, nwritten;
+ char * ptr = (char *)buf;
nleft = nbytes;
while (nleft > 0) {
- nwritten = (int)taosWriteSocket(fd, (char *)ptr, (size_t)nleft);
+ nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft);
if (nwritten <= 0) {
if (errno == EINTR)
continue;
@@ -120,16 +120,16 @@ int taosWriteMsg(SOCKET fd, void *buf, int nbytes) {
return (nbytes - nleft);
}
-int taosReadMsg(SOCKET fd, void *buf, int nbytes) {
- int nleft, nread;
- char *ptr = (char *)buf;
+int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) {
+ int32_t nleft, nread;
+ char * ptr = (char *)buf;
nleft = nbytes;
if (fd < 0) return -1;
while (nleft > 0) {
- nread = (int)taosReadSocket(fd, ptr, (size_t)nleft);
+ nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft);
if (nread == 0) {
break;
} else if (nread < 0) {
@@ -147,11 +147,11 @@ int taosReadMsg(SOCKET fd, void *buf, int nbytes) {
return (nbytes - nleft);
}
-int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
+int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) {
taosSetNonblocking(fd, 1);
- int nleft, nwritten, nready;
- fd_set fset;
+ int32_t nleft, nwritten, nready;
+ fd_set fset;
struct timeval tv;
nleft = nbytes;
@@ -160,7 +160,7 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
tv.tv_usec = 0;
FD_ZERO(&fset);
FD_SET(fd, &fset);
- if ((nready = select((int)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
+ if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT;
uError("fd %d timeout, no enough space to write", fd);
break;
@@ -172,7 +172,7 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
return -1;
}
- nwritten = (int)taosSend(fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
+ nwritten = (int32_t)taosSend(fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
if (nwritten <= 0) {
if (errno == EAGAIN || errno == EINTR) continue;
@@ -189,10 +189,10 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
return (nbytes - nleft);
}
-int taosReadn(SOCKET fd, char *ptr, int nbytes) {
- int nread, nready, nleft = nbytes;
+int32_t taosReadn(SOCKET fd, char *ptr, int32_t nbytes) {
+ int32_t nread, nready, nleft = nbytes;
- fd_set fset;
+ fd_set fset;
struct timeval tv;
while (nleft > 0) {
@@ -200,7 +200,7 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) {
tv.tv_usec = 0;
FD_ZERO(&fset);
FD_SET(fd, &fset);
- if ((nready = select((int)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
+ if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT;
uError("fd %d timeout\n", fd);
break;
@@ -210,7 +210,7 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) {
return -1;
}
- if ((nread = (int)taosReadSocket(fd, ptr, (size_t)nleft)) < 0) {
+ if ((nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft)) < 0) {
if (errno == EINTR) continue;
uError("read error, %d (%s)", errno, strerror(errno));
return -1;
@@ -229,8 +229,8 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) {
SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in localAddr;
- SOCKET sockFd;
- int bufSize = 1024000;
+ SOCKET sockFd;
+ int32_t bufSize = 1024000;
uDebug("open udp socket:0x%x:%hu", ip, port);
@@ -239,7 +239,7 @@ SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
localAddr.sin_addr.s_addr = ip;
localAddr.sin_port = (uint16_t)htons(port);
- if ((sockFd = (int)socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
+ if ((sockFd = (int32_t)socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
uError("failed to open udp socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
@@ -268,9 +268,9 @@ SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
}
SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
- SOCKET sockFd = 0;
+ SOCKET sockFd = 0;
+ int32_t ret;
struct sockaddr_in serverAddr, clientAddr;
- int ret;
sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
@@ -281,7 +281,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
}
/* set REUSEADDR option, so the portnumber can be re-used */
- int reuse = 1;
+ int32_t reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd);
@@ -296,8 +296,8 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
/* bind socket to client address */
if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
- uError("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)",
- clientIp, destIp, destPort, strerror(errno));
+ uError("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
+ strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
@@ -311,7 +311,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret != 0) {
- //uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
+ // uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd);
sockFd = -1;
} else {
@@ -321,36 +321,36 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
return sockFd;
}
-int taosKeepTcpAlive(SOCKET sockFd) {
- int alive = 1;
+int32_t taosKeepTcpAlive(SOCKET sockFd) {
+ int32_t alive = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
uError("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
- int probes = 3;
+ int32_t probes = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
uError("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
- int alivetime = 10;
+ int32_t alivetime = 10;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
uError("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
- int interval = 3;
+ int32_t interval = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
uError("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
- int nodelay = 1;
+ int32_t nodelay = 1;
if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
uError("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
@@ -371,8 +371,8 @@ int taosKeepTcpAlive(SOCKET sockFd) {
SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in serverAdd;
- SOCKET sockFd;
- int reuse;
+ SOCKET sockFd;
+ int32_t reuse;
uDebug("open tcp server socket:0x%x:%hu", ip, port);
@@ -381,7 +381,7 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
serverAdd.sin_addr.s_addr = ip;
serverAdd.sin_port = (uint16_t)htons(port);
- if ((sockFd = (int)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
+ if ((sockFd = (int32_t)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
uError("failed to open TCP socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
@@ -417,38 +417,38 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
return sockFd;
}
-void tinet_ntoa(char *ipstr, unsigned int ip) {
+void tinet_ntoa(char *ipstr, uint32_t ip) {
sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
}
#define COPY_SIZE 32768
// sendfile shall be used
-int taosCopyFds(SOCKET sfd, SOCKET dfd, int64_t len) {
+int32_t taosCopyFds(SOCKET sfd, SOCKET dfd, int64_t len) {
int64_t leftLen;
- int readLen, writeLen;
+ int32_t readLen, writeLen;
char temp[COPY_SIZE];
leftLen = len;
while (leftLen > 0) {
if (leftLen < COPY_SIZE)
- readLen = (int)leftLen;
+ readLen = (int32_t)leftLen;
else
readLen = COPY_SIZE; // 4K
- int retLen = taosReadMsg(sfd, temp, (int)readLen);
+ int32_t retLen = taosReadMsg(sfd, temp, (int32_t)readLen);
if (readLen != retLen) {
- uError("read error, readLen:%d retLen:%d len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", readLen, retLen, len, leftLen,
- strerror(errno));
+ uError("read error, readLen:%d retLen:%d len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", readLen, retLen, len,
+ leftLen, strerror(errno));
return -1;
}
writeLen = taosWriteMsg(dfd, temp, readLen);
if (readLen != writeLen) {
- uError("copy error, readLen:%d writeLen:%d len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", readLen, writeLen, len, leftLen,
- strerror(errno));
+ uError("copy error, readLen:%d writeLen:%d len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", readLen, writeLen,
+ len, leftLen, strerror(errno));
return -1;
}
diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c
index 753076372f..81e6bf0b47 100644
--- a/src/vnode/src/vnodeMain.c
+++ b/src/vnode/src/vnodeMain.c
@@ -255,8 +255,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->fversion = pVnode->version;
- pVnode->wqueue = dnodeAllocateVnodeWqueue(pVnode);
- pVnode->rqueue = dnodeAllocateVnodeRqueue(pVnode);
+ pVnode->wqueue = dnodeAllocVWriteQueue(pVnode);
+ pVnode->rqueue = dnodeAllocVReadQueue(pVnode);
if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) {
vnodeCleanUp(pVnode);
return terrno;
@@ -322,7 +322,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.getWalInfo = vnodeGetWalInfo;
syncInfo.getFileInfo = vnodeGetFileInfo;
syncInfo.writeToCache = vnodeWriteToQueue;
- syncInfo.confirmForward = dnodeSendRpcVnodeWriteRsp;
+ syncInfo.confirmForward = dnodeSendRpcVWriteRsp;
syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
@@ -387,6 +387,10 @@ void vnodeRelease(void *pVnodeRaw) {
pVnode->qMgmt = NULL;
}
+ if (pVnode->wal) {
+ walStop(pVnode->wal);
+ }
+
if (pVnode->tsdb) {
tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL;
@@ -405,12 +409,12 @@ void vnodeRelease(void *pVnodeRaw) {
}
if (pVnode->wqueue) {
- dnodeFreeVnodeWqueue(pVnode->wqueue);
+ dnodeFreeVWriteQueue(pVnode->wqueue);
pVnode->wqueue = NULL;
}
if (pVnode->rqueue) {
- dnodeFreeVnodeRqueue(pVnode->rqueue);
+ dnodeFreeVReadQueue(pVnode->rqueue);
pVnode->rqueue = NULL;
}
diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h
index 7e731d44db..5273eb5b1c 100644
--- a/src/wal/inc/walInt.h
+++ b/src/wal/inc/walInt.h
@@ -49,6 +49,8 @@ typedef struct {
int32_t level;
int32_t fsyncPeriod;
int32_t fsyncSeq;
+ int8_t stop;
+ int8_t reserved[3];
char path[WAL_PATH_LEN];
char name[WAL_FILE_LEN];
pthread_mutex_t mutex;
diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c
index 272f44b93a..c8f0274174 100644
--- a/src/wal/src/walMgmt.c
+++ b/src/wal/src/walMgmt.c
@@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
-#include "talloc.h"
#include "tref.h"
#include "twal.h"
#include "walInt.h"
@@ -56,7 +55,7 @@ void walCleanUp() {
}
void *walOpen(char *path, SWalCfg *pCfg) {
- SWal *pWal = tcalloc(sizeof(SWal));
+ SWal *pWal = tcalloc(1, sizeof(SWal));
if (pWal == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
@@ -110,6 +109,16 @@ int32_t walAlter(void *handle, SWalCfg *pCfg) {
return TSDB_CODE_SUCCESS;
}
+void walStop(void *handle) {
+ if (handle == NULL) return;
+ SWal *pWal = handle;
+
+ pthread_mutex_lock(&pWal->mutex);
+ pWal->stop = 1;
+ pthread_mutex_unlock(&pWal->mutex);
+ wDebug("vgId:%d, stop write wal", pWal->vgId);
+}
+
void walClose(void *handle) {
if (handle == NULL) return;
@@ -123,9 +132,7 @@ void walClose(void *handle) {
while (walGetNextFile(pWal, &fileId) >= 0) {
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
- if (fileId == pWal->fileId) {
- wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
- } else if (remove(pWal->name) < 0) {
+ if (remove(pWal->name) < 0) {
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
} else {
wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c
index 00dc3f4744..e57cb0e042 100644
--- a/src/wal/src/walWrite.c
+++ b/src/wal/src/walWrite.c
@@ -14,8 +14,8 @@
*/
#define _DEFAULT_SOURCE
+#define TAOS_RANDOM_FILE_FAIL_TEST
#include "os.h"
-#include "talloc.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "twal.h"
@@ -29,10 +29,15 @@ int32_t walRenew(void *handle) {
SWal * pWal = handle;
int32_t code = 0;
+ if (pWal->stop) {
+ wDebug("vgId:%d, do not create a new wal file", pWal->vgId);
+ return 0;
+ }
+
pthread_mutex_lock(&pWal->mutex);
if (pWal->fd >= 0) {
- close(pWal->fd);
+ tclose(pWal->fd);
wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name);
}
@@ -90,7 +95,7 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
pthread_mutex_lock(&pWal->mutex);
- if (taosTWrite(pWal->fd, pHead, contLen) != contLen) {
+ if (taosWrite(pWal->fd, pHead, contLen) != contLen) {
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno));
} else {
@@ -151,7 +156,7 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *
if (!pWal->keep) return TSDB_CODE_SUCCESS;
if (count == 0) {
- wDebug("vgId:%d, file:%s not exist, renew it", pWal->vgId, pWal->name);
+ wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId);
return walRenew(pWal);
} else {
// open the existing WAL file in append mode
@@ -204,7 +209,7 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int32_t fd, i
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
- if (taosTRead(fd, pHead, sizeof(SWalHead)) <= 0) {
+ if (taosRead(fd, pHead, sizeof(SWalHead)) <= 0) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
@@ -245,7 +250,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
SWalHead *pHead = buffer;
while (1) {
- int32_t ret = taosTRead(fd, pHead, sizeof(SWalHead));
+ int32_t ret = taosRead(fd, pHead, sizeof(SWalHead));
if (ret == 0) break;
if (ret < 0) {
@@ -282,7 +287,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
pHead = buffer;
}
- ret = taosTRead(fd, pHead->cont, pHead->len);
+ ret = taosRead(fd, pHead->cont, pHead->len);
if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno);
@@ -305,7 +310,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
}
- close(fd);
+ tclose(fd);
tfree(buffer);
return code;
diff --git a/tests/pytest/update/append_commit_last-0.py b/tests/pytest/update/append_commit_last-0.py
new file mode 100644
index 0000000000..c884207f2b
--- /dev/null
+++ b/tests/pytest/update/append_commit_last-0.py
@@ -0,0 +1,90 @@
+###################################################################
+# Copyright (c) 2016 by TAOS Technologies, Inc.
+# All rights reserved.
+#
+# This file is proprietary and confidential to TAOS Technologies.
+# No part of this file may be reproduced, stored, transmitted,
+# disclosed or used in any form or by any means other than as
+# expressly provided by the written permission from Jianhui Tao
+#
+###################################################################
+
+# -*- coding: utf-8 -*-
+
+import sys
+import taos
+from util.log import *
+from util.cases import *
+from util.sql import *
+from util.dnodes import *
+
+
+class TDTestCase:
+ def init(self, conn, logSql):
+ tdLog.debug("start to execute %s" % __file__)
+ tdSql.init(conn.cursor())
+
+ self.ts = 1604298064000
+
+ def restartTaosd(self):
+ tdDnodes.stop(1)
+ tdDnodes.startWithoutSleep(1)
+ tdSql.execute("use db")
+
+ def run(self):
+ tdSql.prepare()
+
+ print("==============step1")
+ tdSql.execute("create table t1 (ts timestamp, a int)")
+
+ for i in range(10):
+ tdSql.execute("insert into t1 values(%d, 1)" % (self.ts + i))
+ self.restartTaosd()
+ tdSql.query("select * from t1")
+ tdSql.checkRows(i + 1)
+ tdSql.query("select sum(a) from t1")
+ tdSql.checkData(0, 0, i + 1)
+
+ print("==============step2")
+ tdSql.execute("create table t2 (ts timestamp, a int)")
+ tdSql.execute("insert into t2 values(%d, 1)" % self.ts)
+ self.restartTaosd()
+ tdSql.query("select * from t2")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 1, 1)
+
+ for i in range(1, 151):
+ tdSql.execute("insert into t2 values(%d, 1)" % (self.ts + i))
+
+ self.restartTaosd()
+ tdSql.query("select * from t2")
+ tdSql.checkRows(151)
+ tdSql.query("select sum(a) from t2")
+ tdSql.checkData(0, 0, 151)
+
+
+ print("==============step3")
+ tdSql.execute("create table t3 (ts timestamp, a int)")
+ tdSql.execute("insert into t3 values(%d, 1)" % self.ts)
+ self.restartTaosd()
+ tdSql.query("select * from t3")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 1, 1)
+
+ for i in range(8):
+ for j in range(1, 11):
+ tdSql.execute("insert into t3 values(%d, 1)" % (self.ts + i * 10 + j))
+
+ self.restartTaosd()
+ tdSql.query("select * from t3")
+ tdSql.checkRows(81)
+ tdSql.query("select sum(a) from t3")
+ tdSql.checkData(0, 0, 81)
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success("%s successfully executed" % __file__)
+
+
+tdCases.addWindows(__file__, TDTestCase())
+tdCases.addLinux(__file__, TDTestCase())
diff --git a/tests/pytest/update/merge_commit_last-0.py b/tests/pytest/update/merge_commit_last-0.py
new file mode 100644
index 0000000000..8a247f3809
--- /dev/null
+++ b/tests/pytest/update/merge_commit_last-0.py
@@ -0,0 +1,309 @@
+###################################################################
+# Copyright (c) 2016 by TAOS Technologies, Inc.
+# All rights reserved.
+#
+# This file is proprietary and confidential to TAOS Technologies.
+# No part of this file may be reproduced, stored, transmitted,
+# disclosed or used in any form or by any means other than as
+# expressly provided by the written permission from Jianhui Tao
+#
+###################################################################
+
+# -*- coding: utf-8 -*-
+
+import sys
+import taos
+from util.log import *
+from util.cases import *
+from util.sql import *
+from util.dnodes import *
+
+
+class TDTestCase:
+ def init(self, conn, logSql):
+ tdLog.debug("start to execute %s" % __file__)
+ tdSql.init(conn.cursor())
+
+ self.ts = 1603152000000
+
+ def restartTaosd(self):
+ tdDnodes.stop(1)
+ tdDnodes.startWithoutSleep(1)
+ tdSql.execute("use db")
+
+ def run(self):
+ tdSql.prepare()
+
+ print("==============step 1: UPDATE THE LAST RECORD REPEATEDLY")
+ tdSql.execute("create table t1 (ts timestamp, a int)")
+
+ for i in range(5):
+ tdSql.execute("insert into t1 values(%d, %d)" % (self.ts, i))
+ self.restartTaosd()
+ tdSql.query("select * from t1")
+ tdSql.checkRows(1)
+ tdSql.checkData(0, 1, 0)
+
+ print("==============step 2: UPDATE THE WHOLE LAST BLOCK")
+ tdSql.execute("create table t2 (ts timestamp, a int)")
+
+ for i in range(50):
+ tdSql.execute("insert into t2 values(%d, 1)" % (self.ts + i))
+
+ self.restartTaosd()
+ tdSql.query("select * from t2")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t2")
+ tdSql.checkData(0, 0, 50)
+
+ for i in range(50):
+ tdSql.execute("insert into t2 values(%d, 2)" % (self.ts + i))
+ tdSql.query("select * from t2")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t2")
+ tdSql.checkData(0, 0, 50)
+
+ self.restartTaosd()
+ tdSql.query("select * from t2")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t2")
+ tdSql.checkData(0, 0, 50)
+
+ print("==============step 3: UPDATE PART OF THE LAST BLOCK")
+ tdSql.execute("create table t3 (ts timestamp, a int)")
+
+ for i in range(50):
+ tdSql.execute("insert into t3 values(%d, 1)" % (self.ts + i))
+ self.restartTaosd()
+ tdSql.query("select * from t3")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t3")
+ tdSql.checkData(0, 0, 50)
+
+ for i in range(25):
+ tdSql.execute("insert into t3 values(%d, 2)" % (self.ts + i))
+
+ tdSql.query("select * from t3")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t3")
+ tdSql.checkData(0, 0, 50)
+
+ self.restartTaosd()
+ tdSql.query("select * from t3")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t3")
+ tdSql.checkData(0, 0, 50)
+
+ print("==============step 4: UPDATE AND INSERT APPEND AT END OF DATA")
+ tdSql.execute("create table t4 (ts timestamp, a int)")
+
+ for i in range(50):
+ tdSql.execute("insert into t4 values(%d, 1)" % (self.ts + i))
+
+ self.restartTaosd()
+ tdSql.query("select * from t4")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t4")
+ tdSql.checkData(0, 0, 50)
+
+ for i in range(25):
+ tdSql.execute("insert into t4 values(%d, 2)" % (self.ts + i))
+
+ for i in range(50, 60):
+ tdSql.execute("insert into t4 values(%d, 2)" % (self.ts + i))
+
+ tdSql.query("select * from t4")
+ tdSql.checkRows(60)
+ tdSql.query("select sum(a) from t4")
+ tdSql.checkData(0, 0, 70)
+
+ self.restartTaosd()
+ tdSql.query("select * from t4")
+ tdSql.checkRows(60)
+ tdSql.query("select sum(a) from t4")
+ tdSql.checkData(0, 0, 70)
+
+ print("==============step 5: UPDATE AND INSERT PREPEND SOME DATA")
+ tdSql.execute("create table t5 (ts timestamp, a int)")
+
+ for i in range(50):
+ tdSql.execute("insert into t5 values(%d, 1)" % (self.ts + i))
+
+ self.restartTaosd()
+ tdSql.query("select * from t5")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t5")
+ tdSql.checkData(0, 0, 50)
+
+ for i in range(-10, 0):
+ tdSql.execute("insert into t5 values(%d, 2)" % (self.ts + i))
+
+ for i in range(25):
+ tdSql.execute("insert into t5 values(%d, 2)" % (self.ts + i))
+
+ tdSql.query("select * from t5")
+ tdSql.checkRows(60)
+ tdSql.query("select sum(a) from t5")
+ tdSql.checkData(0, 0, 70)
+
+ self.restartTaosd()
+ tdSql.query("select * from t5")
+ tdSql.checkRows(60)
+ tdSql.query("select sum(a) from t5")
+ tdSql.checkData(0, 0, 70)
+
+ for i in range(-10, 0):
+ tdSql.execute("insert into t5 values(%d, 3)" % (self.ts + i))
+
+ for i in range(25, 50):
+ tdSql.execute("insert into t5 values(%d, 3)" % (self.ts + i))
+
+ tdSql.query("select * from t5")
+ tdSql.checkRows(60)
+ tdSql.query("select sum(a) from t5")
+ tdSql.checkData(0, 0, 70)
+
+ self.restartTaosd()
+ tdSql.query("select * from t5")
+ tdSql.checkRows(60)
+ tdSql.query("select sum(a) from t5")
+ tdSql.checkData(0, 0, 70)
+
+
+ print("==============step 6: INSERT AHEAD A LOT OF DATA")
+ tdSql.execute("create table t6 (ts timestamp, a int)")
+
+ for i in range(50):
+ tdSql.execute("insert into t6 values(%d, 1)" % (self.ts + i))
+
+ self.restartTaosd()
+ tdSql.query("select * from t6")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t6")
+ tdSql.checkData(0, 0, 50)
+
+ for i in range(-1000, 0):
+ tdSql.execute("insert into t6 values(%d, 2)" % (self.ts + i))
+
+ tdSql.query("select * from t6")
+ tdSql.checkRows(1050)
+ tdSql.query("select sum(a) from t6")
+ tdSql.checkData(0, 0, 2050)
+
+ self.restartTaosd()
+ tdSql.query("select * from t6")
+ tdSql.checkRows(1050)
+ tdSql.query("select sum(a) from t6")
+ tdSql.checkData(0, 0, 2050)
+
+ print("==============step 7: INSERT AHEAD A LOT AND UPDATE")
+ tdSql.execute("create table t7 (ts timestamp, a int)")
+
+ for i in range(50):
+ tdSql.execute("insert into t7 values(%d, 1)" % (self.ts + i))
+
+ self.restartTaosd()
+ tdSql.query("select * from t7")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t7")
+ tdSql.checkData(0, 0, 50)
+
+ for i in range(-1000, 25):
+ tdSql.execute("insert into t7 values(%d, 2)" % (self.ts + i))
+
+ tdSql.query("select * from t7")
+ tdSql.checkRows(1050)
+ tdSql.query("select sum(a) from t7")
+ tdSql.checkData(0, 0, 2050)
+
+ self.restartTaosd()
+ tdSql.query("select * from t7")
+ tdSql.checkRows(1050)
+ tdSql.query("select sum(a) from t7")
+ tdSql.checkData(0, 0, 2050)
+
+ print("==============step 8: INSERT AFTER A LOT AND UPDATE")
+ tdSql.execute("create table t8 (ts timestamp, a int)")
+
+ for i in range(50):
+ tdSql.execute("insert into t8 values(%d, 1)" % (self.ts + i))
+
+ self.restartTaosd()
+ tdSql.query("select * from t8")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t8")
+ tdSql.checkData(0, 0, 50)
+
+ for i in range(25, 6000):
+ tdSql.execute("insert into t8 values(%d, 2)" % (self.ts + i))
+
+ tdSql.query("select * from t8")
+ tdSql.checkRows(6000)
+ tdSql.query("select sum(a) from t8")
+ tdSql.checkData(0, 0, 11950)
+
+ self.restartTaosd()
+ tdSql.query("select * from t8")
+ tdSql.checkRows(6000)
+ tdSql.query("select sum(a) from t8")
+ tdSql.checkData(0, 0, 11950)
+
+ print("==============step 9: UPDATE ONLY MIDDLE")
+ tdSql.execute("create table t9 (ts timestamp, a int)")
+
+ for i in range(50):
+ tdSql.execute("insert into t9 values(%d, 1)" % (self.ts + i))
+
+ self.restartTaosd()
+ tdSql.query("select * from t9")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t9")
+ tdSql.checkData(0, 0, 50)
+
+ for i in range(20, 30):
+ tdSql.execute("insert into t9 values(%d, 2)" % (self.ts + i))
+
+ tdSql.query("select * from t9")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t9")
+ tdSql.checkData(0, 0, 50)
+
+ self.restartTaosd()
+ tdSql.query("select * from t9")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t9")
+ tdSql.checkData(0, 0, 50)
+
+ print("==============step 10: A LOT OF DATA COVER THE WHOLE BLOCK")
+ tdSql.execute("create table t10 (ts timestamp, a int)")
+
+ for i in range(50):
+ tdSql.execute("insert into t10 values(%d, 1)" % (self.ts + i))
+
+ self.restartTaosd()
+ tdSql.query("select * from t10")
+ tdSql.checkRows(50)
+ tdSql.query("select sum(a) from t10")
+ tdSql.checkData(0, 0, 50)
+
+ for i in range(-4000, 4000):
+ tdSql.execute("insert into t10 values(%d, 2)" % (self.ts + i))
+
+ tdSql.query("select * from t10")
+ tdSql.checkRows(8000)
+ tdSql.query("select sum(a) from t10")
+ tdSql.checkData(0, 0, 15950)
+
+ self.restartTaosd()
+ tdSql.query("select * from t10")
+ tdSql.checkRows(8000)
+ tdSql.query("select sum(a) from t10")
+ tdSql.checkData(0, 0, 15950)
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success("%s successfully executed" % __file__)
+
+
+tdCases.addWindows(__file__, TDTestCase())
+tdCases.addLinux(__file__, TDTestCase())
diff --git a/tests/pytest/updatetest.sh b/tests/pytest/updatetest.sh
new file mode 100644
index 0000000000..ade1180553
--- /dev/null
+++ b/tests/pytest/updatetest.sh
@@ -0,0 +1,10 @@
+# update
+python3 ./test.py -f update/allow_update.py
+python3 ./test.py -f update/allow_update-0.py
+python3 ./test.py -f update/append_commit_data.py
+python3 ./test.py -f update/append_commit_last-0.py
+python3 ./test.py -f update/append_commit_last.py
+python3 ./test.py -f update/merge_commit_data.py
+python3 ./test.py -f update/merge_commit_data2.py
+python3 ./test.py -f update/merge_commit_last-0.py
+python3 ./test.py -f update/merge_commit_last.py
\ No newline at end of file
diff --git a/tests/script/general/insert/basic.sim b/tests/script/general/insert/basic.sim
index ba8cff83fa..3f0f25a95b 100644
--- a/tests/script/general/insert/basic.sim
+++ b/tests/script/general/insert/basic.sim
@@ -8,8 +8,8 @@ sleep 3000
sql connect
$i = 0
-$dbPrefix = tb_in_db
-$tbPrefix = tb_in_tb
+$dbPrefix = d
+$tbPrefix = t
$db = $dbPrefix . $i
$tb = $tbPrefix . $i
@@ -22,28 +22,27 @@ sql create table $tb (ts timestamp, speed int)
$x = 0
while $x < 10
- $ms = $x . m
- sql insert into $tb values (now + $ms , $x )
+ $cc = $x * 60000
+ $ms = 1601481600000 + $cc
+
+ sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
print =============== step 2
-sql insert into $tb values (now - 5m , 10)
-sql insert into $tb values (now - 6m , 10)
-sql insert into $tb values (now - 7m , 10)
-sql insert into $tb values (now - 8m , 10)
+$x = 0
+while $x < 5
+ $cc = $x * 60000
+ $ms = 1551481600000 + $cc
+
+ sql insert into $tb values ($ms , $x )
+ $x = $x + 1
+endw
sql select * from $tb
print $rows points data are retrieved
-if $rows != 14 then
- return -1
-endi
-
-sql drop database $db
-sleep 1000
-sql show databases
-if $rows != 0 then
+if $rows != 15 then
return -1
endi
diff --git a/tests/test-all.sh b/tests/test-all.sh
index e45dd15fed..f4e992eb5a 100755
--- a/tests/test-all.sh
+++ b/tests/test-all.sh
@@ -9,7 +9,7 @@ NC='\033[0m'
function runSimCaseOneByOne {
while read -r line; do
- if [[ $line =~ ^./test.sh* ]]; then
+ if [[ $line =~ ^./test.sh* ]] || [[ $line =~ ^run* ]]; then
case=`echo $line | grep sim$ |awk '{print $NF}'`
start_time=`date +%s`