From 6e14f9dc8ba8e3a3ef6839f2123973dc33ad38c8 Mon Sep 17 00:00:00 2001 From: slguan Date: Sat, 9 Nov 2019 14:37:44 +0800 Subject: [PATCH 01/55] Client and server versions may not match --- src/client/src/tscSql.c | 43 +++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index a2d4abbca3..0e1fead70e 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -138,34 +138,35 @@ TAOS *taos_connect(char *ip, char *user, char *pass, char *db, int port) { if (taos != NULL) { STscObj* pObj = (STscObj*) taos; - int clientVersionNumber[4] = {0}; - if (!taosGetVersionNumber(version, clientVersionNumber)) { - tscError("taos:%p, invalid client version:%s", taos, version); - //pObj->pSql->res.code = TSDB_CODE_INVALID_CLIENT_VERSION; - globalCode = TSDB_CODE_INVALID_CLIENT_VERSION; - taos_close(taos); - return NULL; - } - - char *server_version = taos_get_server_info(taos); - int serverVersionNumber[4] = {0}; - if (!taosGetVersionNumber(server_version, serverVersionNumber)) { - tscError("taos:%p, invalid server version:%s", taos, server_version); - //pObj->pSql->res.code = TSDB_CODE_INVALID_CLIENT_VERSION; - globalCode = TSDB_CODE_INVALID_CLIENT_VERSION; - taos_close(taos); - return NULL; - } - // version compare only requires the first 3 segments of the version string int32_t comparedSegments = 3; + char client_version[64] = {0}; + char server_version[64] = {0}; + int clientVersionNumber[4] = {0}; + int serverVersionNumber[4] = {0}; + + strcpy(client_version, version); + strcpy(server_version, taos_get_server_info(taos)); + + if (!taosGetVersionNumber(client_version, clientVersionNumber)) { + tscError("taos:%p, invalid client version:%s", taos, client_version); + pObj->pSql->res.code = TSDB_CODE_INVALID_CLIENT_VERSION; + taos_close(taos); + return NULL; + } + + if (!taosGetVersionNumber(server_version, serverVersionNumber)) { + tscError("taos:%p, invalid server version:%s", taos, server_version); + pObj->pSql->res.code = TSDB_CODE_INVALID_CLIENT_VERSION; + taos_close(taos); + return NULL; + } for(int32_t i = 0; i < comparedSegments; ++i) { if (clientVersionNumber[i] != serverVersionNumber[i]) { tscError("taos:%p, the %d-th number of server version:%s not matched with client version:%s, close connection", taos, i, server_version, version); - //pObj->pSql->res.code = TSDB_CODE_INVALID_CLIENT_VERSION; - globalCode = TSDB_CODE_INVALID_CLIENT_VERSION; + pObj->pSql->res.code = TSDB_CODE_INVALID_CLIENT_VERSION; taos_close(taos); return NULL; } From 0d2eb1bf0848bca00377a8ac5515f632c622dfa5 Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 15 Nov 2019 18:44:00 +0800 Subject: [PATCH 02/55] [TBASE-1155] --- src/os/darwin/src/tdarwin.c | 4 ---- src/os/linux/src/tsystem.c | 19 ------------------- src/os/windows/src/twindows.c | 4 ---- src/system/src/vnodeImport.c | 3 ++- src/util/src/tutil.c | 19 +++++++++++++++++++ 5 files changed, 21 insertions(+), 28 deletions(-) diff --git a/src/os/darwin/src/tdarwin.c b/src/os/darwin/src/tdarwin.c index 2f97c7c376..71e8a56466 100644 --- a/src/os/darwin/src/tdarwin.c +++ b/src/os/darwin/src/tdarwin.c @@ -243,10 +243,6 @@ int taosInitTimer(void (*callback)(int), int ms) { return setitimer(ITIMER_REAL, &tv, NULL); } -char *taosCharsetReplace(char *charsetstr) { - return charsetstr; -} - void taosGetSystemTimezone() { // get and set default timezone SGlobalConfig *cfg_timezone = tsGetConfigOption("timezone"); diff --git a/src/os/linux/src/tsystem.c b/src/os/linux/src/tsystem.c index 0989f007b3..e1f771759a 100644 --- a/src/os/linux/src/tsystem.c +++ b/src/os/linux/src/tsystem.c @@ -210,25 +210,6 @@ void taosGetSystemTimezone() { pPrint("timezone not configured, set to system default:%s", tsTimezone); } -typedef struct CharsetPair { - char *oldCharset; - char *newCharset; -} CharsetPair; - -char *taosCharsetReplace(char *charsetstr) { - CharsetPair charsetRep[] = { - {"utf8", "UTF-8"}, {"936", "CP936"}, - }; - - for (int32_t i = 0; i < tListLen(charsetRep); ++i) { - if (strcasecmp(charsetRep[i].oldCharset, charsetstr) == 0) { - return strdup(charsetRep[i].newCharset); - } - } - - return strdup(charsetstr); -} - /* * POSIX format locale string: * (Language Strings)_(Country/Region Strings).(code_page) diff --git a/src/os/windows/src/twindows.c b/src/os/windows/src/twindows.c index 75bc8c2839..53563341c4 100644 --- a/src/os/windows/src/twindows.c +++ b/src/os/windows/src/twindows.c @@ -96,10 +96,6 @@ void __sync_val_restore_32(int32_t *ptr, int32_t newval) { void tsPrintOsInfo() {} -char *taosCharsetReplace(char *charsetstr) { - return charsetstr; -} - void taosGetSystemTimezone() { // get and set default timezone SGlobalConfig *cfg_timezone = tsGetConfigOption("timezone"); diff --git a/src/system/src/vnodeImport.c b/src/system/src/vnodeImport.c index 9691206bf9..bfffc66aa5 100644 --- a/src/system/src/vnodeImport.c +++ b/src/system/src/vnodeImport.c @@ -546,6 +546,7 @@ int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) { return code; } + assert(rows); dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to cache, firstKey:%ld lastKey:%ld", pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey); @@ -781,7 +782,7 @@ int vnodeImportStartToCache(SImportInfo *pImport, char *payload, int rows) { pImport->importedRows = rows; code = vnodeImportToCache(pImport, payload, rows); } else { - dTrace("vid:%d sid:%d id:%s, data is already imported to cache", pObj->vnode, pObj->sid, pObj->meterId); + dTrace("vid:%d sid:%d id:%s, data is already imported to cache, firstKey:%lld", pObj->vnode, pObj->sid, pObj->meterId, pImport->firstKey); } return code; diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index cb93319900..0b407a848b 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -508,3 +508,22 @@ char *taosIpStr(int ipInt) { sprintf(ipStr, "0x%x:%d.%d.%d.%d", ipInt, ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, ipInt >> 24); return ipStr; } + +typedef struct CharsetPair { + char *oldCharset; + char *newCharset; +} CharsetPair; + +char *taosCharsetReplace(char *charsetstr) { + CharsetPair charsetRep[] = { + { "utf8", "UTF-8" }, { "936", "CP936" }, + }; + + for (int32_t i = 0; i < tListLen(charsetRep); ++i) { + if (strcasecmp(charsetRep[i].oldCharset, charsetstr) == 0) { + return strdup(charsetRep[i].newCharset); + } + } + + return strdup(charsetstr); +} \ No newline at end of file From 6bc4ab252bde7a29b384e7b6786928647d019d7f Mon Sep 17 00:00:00 2001 From: huili Date: Sat, 30 Nov 2019 18:08:57 +0800 Subject: [PATCH 03/55] [modify the name of the client install package] --- packaging/tools/makeclient.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/tools/makeclient.sh b/packaging/tools/makeclient.sh index 0e52e0970a..7e22116ac5 100755 --- a/packaging/tools/makeclient.sh +++ b/packaging/tools/makeclient.sh @@ -20,7 +20,7 @@ release_dir="${top_dir}/release" community_dir="${script_dir}/../../../community/src" package_name='linux' -install_dir="${release_dir}/TDengine-client-enterprise-${version}-${package_name}-$(echo ${build_time}| tr ': ' -)" +install_dir="${release_dir}/taos-client-${version}-${package_name}-$(echo ${build_time}| tr ': ' -)" # Directories and files. bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdump ${script_dir}/remove_client.sh" From 27f52b89759cfc1c25c2b4a25410803b1fbecc3e Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 30 Nov 2019 22:37:22 +0800 Subject: [PATCH 04/55] fix #808 --- src/rpc/src/trpc.c | 49 ++++++++++++++++++++++------------------------ 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index d84746817d..9f006ab05a 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -1059,6 +1059,16 @@ int taosBuildErrorMsgToPeer(char *pMsg, int code, char *pReply) { return msgLen; } +void taosReportDisconnection(SRpcChann *pChann, SRpcConn *pConn) +{ + SSchedMsg schedMsg; + schedMsg.fp = taosProcessSchedMsg; + schedMsg.msg = NULL; + schedMsg.ahandle = pConn->ahandle; + schedMsg.thandle = pConn; + taosScheduleTask(pChann->qhandle, &schedMsg); +} + void taosProcessIdleTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; if (pConn->signature != param) { @@ -1074,22 +1084,20 @@ void taosProcessIdleTimer(void *param, void *tmrId) { return; } + int reportDisc = 0; + pthread_mutex_lock(&pChann->mutex); tTrace("%s cid:%d sid:%d id:%s, close the connection since no activity pConn:%p", pServer->label, pConn->chann, pConn->sid, pConn->meterId, pConn); if (pConn->rspReceived == 0) { pConn->rspReceived = 1; - - SSchedMsg schedMsg; - schedMsg.fp = taosProcessSchedMsg; - schedMsg.msg = NULL; - schedMsg.ahandle = pConn->ahandle; - schedMsg.thandle = pConn; - taosScheduleTask(pChann->qhandle, &schedMsg); + reportDisc = 1; } pthread_mutex_unlock(&pChann->mutex); + + if (reportDisc) taosReportDisconnection(pChann, pConn); } void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, @@ -1114,11 +1122,7 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por pConn->meterId, pConn); pConn->rspReceived = 1; pConn->chandle = NULL; - schedMsg.fp = taosProcessSchedMsg; - schedMsg.msg = NULL; - schedMsg.ahandle = pConn->ahandle; - schedMsg.thandle = pConn; - taosScheduleTask(pChann->qhandle, &schedMsg); + taosReportDisconnection(pChann, pConn); } tfree(data); return NULL; @@ -1330,6 +1334,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { STaosHeader *pHeader = NULL; SRpcConn * pConn = (SRpcConn *)param; int msgLen; + int reportDisc = 0; if (pConn->signature != param) { tError("pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param); @@ -1379,13 +1384,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { pConn->sid, pConn->meterId, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn); if (pConn->rspReceived == 0) { pConn->rspReceived = 1; - - SSchedMsg schedMsg; - schedMsg.fp = taosProcessSchedMsg; - schedMsg.msg = NULL; - schedMsg.ahandle = pConn->ahandle; - schedMsg.thandle = pConn; - taosScheduleTask(pChann->qhandle, &schedMsg); + reportDisc = 1; } } } @@ -1397,6 +1396,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { pthread_mutex_unlock(&pChann->mutex); + if (reportDisc) taosReportDisconnection(pChann, pConn); } void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, uint16_t *peerPort, int *cid, int *sid) { @@ -1443,22 +1443,19 @@ void taosStopRpcConn(void *thandle) { tTrace("%s cid:%d sid:%d id:%s, stop the connection pConn:%p", pServer->label, pConn->chann, pConn->sid, pConn->meterId, pConn); + int reportDisc = 0; pthread_mutex_lock(&pChann->mutex); if (pConn->outType) { pConn->rspReceived = 1; - SSchedMsg schedMsg; - schedMsg.fp = taosProcessSchedMsg; - schedMsg.msg = NULL; - schedMsg.ahandle = pConn->ahandle; - schedMsg.thandle = pConn; + reportDisc = 1; pthread_mutex_unlock(&pChann->mutex); - - taosScheduleTask(pChann->qhandle, &schedMsg); } else { pthread_mutex_unlock(&pChann->mutex); taosCloseRpcConn(pConn); } + + if (reportDisc) taosReportDisconnection(pChann, pConn); } int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) { From 0224972e63befb26827cf85bfd258229f26be7dd Mon Sep 17 00:00:00 2001 From: slguan Date: Sat, 30 Nov 2019 23:17:58 +0800 Subject: [PATCH 05/55] fix some release scripts --- packaging/release.sh | 6 +++--- packaging/tools/install.sh | 4 ++-- packaging/tools/release_note | 11 +++++++++++ src/inc/tsdb.h | 5 ++++- src/util/src/version.c | 4 ++-- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/packaging/release.sh b/packaging/release.sh index 6a00f0b79d..58f69589d8 100755 --- a/packaging/release.sh +++ b/packaging/release.sh @@ -123,11 +123,11 @@ cd ${compile_dir} # arm only support lite ver if [ -z "$armver" ]; then - cmake ${top_dir}/../ + cmake ../ elif [ "$armver" == "arm64" ]; then - cmake ${top_dir}/../ -DVERSION=lite -DARMVER=arm64 + cmake ../ -DARMVER=arm64 elif [ "$armver" == "arm32" ]; then - cmake ${top_dir}/../ -DVERSION=lite -DARMVER=arm32 + cmake ../ -DARMVER=arm32 else echo "input parameter error!!!" return diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 2911452fb4..019e614e0b 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -313,9 +313,9 @@ vercomp () { function is_version_compatible() { - curr_version=$(${bin_dir}/taosd -V | cut -d ' ' -f 2) + curr_version=$(${bin_dir}/taosd -V | head -1 | cut -d ' ' -f 2) - min_compatible_version=$(${script_dir}/bin/taosd -V | cut -d ' ' -f 4) + min_compatible_version=$(${script_dir}/bin/taosd -V | head -1 | cut -d ' ' -f 4) vercomp $curr_version $min_compatible_version case $? in diff --git a/packaging/tools/release_note b/packaging/tools/release_note index 3a3cd81ca9..4578a4523c 100644 --- a/packaging/tools/release_note +++ b/packaging/tools/release_note @@ -1,3 +1,14 @@ +taos-1.6.4.0 (Release on 2019-12-01) +Bug fixed: + 1.Look for possible causes of file corruption and fix them + 2.Encapsulate memory allocation functions to reduce the possibility of crashes + 3.Increase Arm64 compilation options + 4.Remove most of the warnings in the code + 5.Provide a variety of connector usage documents + 6.Network connection can be selected in udp and tcp + 7.Allow the maximum number of Tags to be 32 + 8.Bugs reported by the user + taos-1.5.2.6 (Release on 2019-05-13) Bug fixed: - Nchar strings sometimes were wrongly truncated on Window diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index ea356a257a..8b057654c6 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -147,7 +147,10 @@ extern "C" { #define TSDB_MAX_MGMT_IPS (TSDB_MAX_MPEERS+1) #define TSDB_REPLICA_MIN_NUM 1 -#define TSDB_REPLICA_MAX_NUM 3 +/* + * this is defined in CMakeList.txt + */ +//#define TSDB_REPLICA_MAX_NUM 3 #define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_METERMETA_MAX_NUM 100000 // maximum batch size allowed to load metermeta diff --git a/src/util/src/version.c b/src/util/src/version.c index 35cd40a942..87426123ec 100644 --- a/src/util/src/version.c +++ b/src/util/src/version.c @@ -1,4 +1,4 @@ char version[64] = "1.6.4.0"; char compatible_version[64] = "1.6.1.0"; -char gitinfo[128] = "b6e308866e315483915f4c42a2717547ed0b9d36"; -char buildinfo[512] = "Built by ubuntu at 2019-11-26 21:56"; +char gitinfo[128] = "e86237890ee9daa8f6ab64ed225a027ae63ebdfc"; +char buildinfo[512] = "Built by ubuntu at 2019-11-30 23:16"; From 79fc94f019f82f54ed137277aa9c79d6365994b1 Mon Sep 17 00:00:00 2001 From: slguan Date: Sun, 1 Dec 2019 09:29:58 +0800 Subject: [PATCH 06/55] [TBASE-1280] --- src/rpc/src/tudp.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rpc/src/tudp.c b/src/rpc/src/tudp.c index bfc95373fc..fb0b37d93b 100644 --- a/src/rpc/src/tudp.c +++ b/src/rpc/src/tudp.c @@ -377,6 +377,7 @@ void *taosTransferDataViaTcp(void *argv) { pThead->tcp = 1; pThead->msgType = (char)(pHeader->msgType - 1); pThead->msgLen = (int32_t)htonl(sizeof(STaosHeader)); + uint32_t id = pThead->sourceId; pThead->sourceId = pThead->destId; pThead->destId = id; pMonitor->ip = pTransfer->ip; pMonitor->port = pTransfer->port; pMonitor->pSet = pSet; From a7f79b508f77f849bd69a8905cd0cc8bd2df02f8 Mon Sep 17 00:00:00 2001 From: slguan Date: Sun, 1 Dec 2019 12:27:29 +0800 Subject: [PATCH 07/55] public v1.6.4.0 --- src/util/src/version.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/util/src/version.c b/src/util/src/version.c index 87426123ec..ed83066f80 100644 --- a/src/util/src/version.c +++ b/src/util/src/version.c @@ -1,4 +1,4 @@ char version[64] = "1.6.4.0"; char compatible_version[64] = "1.6.1.0"; -char gitinfo[128] = "e86237890ee9daa8f6ab64ed225a027ae63ebdfc"; -char buildinfo[512] = "Built by ubuntu at 2019-11-30 23:16"; +char gitinfo[128] = "6d27c11e3b23ae69366df366a6517853648c41f7"; +char buildinfo[512] = "Built by ubuntu at 2019-12-01 12:27"; From faca8084e11dd83c07ba4c18dced20c54175f5a4 Mon Sep 17 00:00:00 2001 From: slguan Date: Sun, 1 Dec 2019 13:24:12 +0800 Subject: [PATCH 08/55] resolve conflicts --- src/os/windows/src/twindows.c | 3 - src/system/src/vnodeImport.c | 985 ---------------------------------- src/util/src/tutil.c | 22 - 3 files changed, 1010 deletions(-) delete mode 100644 src/system/src/vnodeImport.c diff --git a/src/os/windows/src/twindows.c b/src/os/windows/src/twindows.c index 49c2466669..98be6b60ba 100644 --- a/src/os/windows/src/twindows.c +++ b/src/os/windows/src/twindows.c @@ -132,8 +132,6 @@ short interlocked_or_fetch_16(short volatile* ptr, short val) { return _InterlockedOr16(ptr, val) | val; } -<<<<<<< HEAD -======= long interlocked_or_fetch_32(long volatile* ptr, long val) { return _InterlockedOr(ptr, val) | val; } @@ -207,7 +205,6 @@ __int64 interlocked_fetch_xor_64(__int64 volatile* ptr, __int64 val) { void tsPrintOsInfo() {} ->>>>>>> release/v1.6.4.0 void taosGetSystemTimezone() { // get and set default timezone SGlobalConfig *cfg_timezone = tsGetConfigOption("timezone"); diff --git a/src/system/src/vnodeImport.c b/src/system/src/vnodeImport.c deleted file mode 100644 index bfffc66aa5..0000000000 --- a/src/system/src/vnodeImport.c +++ /dev/null @@ -1,985 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include -#include -#include -#include - -#include "trpc.h" -#include "ttimer.h" -#include "vnode.h" -#include "vnodeMgmt.h" -#include "vnodeShell.h" -#include "vnodeShell.h" -#include "vnodeUtil.h" -#pragma GCC diagnostic ignored "-Wpointer-sign" -#pragma GCC diagnostic ignored "-Wint-conversion" - -typedef struct { - SCompHeader *headList; - SCompInfo compInfo; - int last; // 0:last block in data file, 1:not the last block - int newBlocks; - int oldNumOfBlocks; - int64_t compInfoOffset; // offset for compInfo in head file - int64_t leftOffset; // copy from this offset to end of head file - int64_t hfdSize; // old head file size -} SHeadInfo; - -typedef struct { - void *signature; - SShellObj *pShell; - SMeterObj *pObj; - int retry; - TSKEY firstKey; - TSKEY lastKey; - int importedRows; - int commit; // start to commit if it is set to 1 - - int slot; // slot/block to start writing the import data - int pos; // pos to start writing the import data in the slot/block - TSKEY key; - - // only for file - int numOfPoints; - int fileId; - int64_t offset; // offset in data file - SData *sdata[TSDB_MAX_COLUMNS]; - char *buffer; - char *payload; - char *opayload; // allocated space for payload from client - int rows; -} SImportInfo; - -int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport); - -int vnodeGetImportStartPart(SMeterObj *pObj, char *payload, int rows, TSKEY key1) { - int i; - - for (i = 0; i < rows; ++i) { - TSKEY key = *((TSKEY *)(payload + i * pObj->bytesPerPoint)); - if (key >= key1) break; - } - - return i; -} - -int vnodeGetImportEndPart(SMeterObj *pObj, char *payload, int rows, char **pStart, TSKEY key0) { - int i; - - for (i = 0; i < rows; ++i) { - TSKEY key = *((TSKEY *)(payload + i * pObj->bytesPerPoint)); - if (key > key0) break; - } - - *pStart = payload + i * pObj->bytesPerPoint; - return rows - i; -} - -int vnodeCloseFileForImport(SMeterObj *pObj, SHeadInfo *pHinfo) { - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg *pCfg = &pVnode->cfg; - TSCKSUM chksum = 0; - - if (pHinfo->newBlocks == 0 || pHinfo->compInfoOffset == 0) return 0; - - if (pHinfo->oldNumOfBlocks == 0) twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM)); - - int leftSize = pHinfo->hfdSize - pHinfo->leftOffset; - if (leftSize > 0) { - lseek(pVnode->hfd, pHinfo->leftOffset, SEEK_SET); - tsendfile(pVnode->nfd, pVnode->hfd, NULL, leftSize); - } - - pHinfo->compInfo.numOfBlocks += pHinfo->newBlocks; - int offset = (pHinfo->compInfo.numOfBlocks - pHinfo->oldNumOfBlocks) * sizeof(SCompBlock); - if (pHinfo->oldNumOfBlocks == 0) offset += sizeof(SCompInfo) + sizeof(TSCKSUM); - - pHinfo->headList[pObj->sid].compInfoOffset = pHinfo->compInfoOffset; - for (int sid = pObj->sid + 1; sid < pCfg->maxSessions; ++sid) { - if (pHinfo->headList[sid].compInfoOffset) pHinfo->headList[sid].compInfoOffset += offset; - } - - lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET); - int tmsize = sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)pHinfo->headList, tmsize); - twrite(pVnode->nfd, pHinfo->headList, tmsize); - - int size = pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock); - char *buffer = malloc(size); - lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET); - read(pVnode->nfd, buffer, size); - SCompBlock *pBlock = (SCompBlock *)(buffer + (pHinfo->compInfo.numOfBlocks - 1) * sizeof(SCompBlock)); - - pHinfo->compInfo.uid = pObj->uid; - pHinfo->compInfo.delimiter = TSDB_VNODE_DELIMITER; - pHinfo->compInfo.last = pBlock->last; - - taosCalcChecksumAppend(0, (uint8_t *)(&pHinfo->compInfo), sizeof(SCompInfo)); - lseek(pVnode->nfd, pHinfo->compInfoOffset, SEEK_SET); - twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo)); - - chksum = taosCalcChecksum(0, (uint8_t *)buffer, size); - lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo) + size, SEEK_SET); - twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM)); - free(buffer); - - vnodeCloseCommitFiles(pVnode); - - return 0; -} - -int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[]) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SCompBlock lastBlock; - int code = 0; - - if (pHinfo->compInfo.last == 0) return 0; - - // read into memory - uint64_t offset = - pHinfo->compInfoOffset + (pHinfo->compInfo.numOfBlocks - 1) * sizeof(SCompBlock) + sizeof(SCompInfo); - lseek(pVnode->hfd, offset, SEEK_SET); - read(pVnode->hfd, &lastBlock, sizeof(SCompBlock)); - assert(lastBlock.last); - - if (lastBlock.sversion != pObj->sversion) { - lseek(pVnode->lfd, lastBlock.offset, SEEK_SET); - lastBlock.offset = lseek(pVnode->dfd, 0, SEEK_END); - tsendfile(pVnode->dfd, pVnode->lfd, NULL, lastBlock.len); - - lastBlock.last = 0; - lseek(pVnode->hfd, offset, SEEK_SET); - twrite(pVnode->hfd, &lastBlock, sizeof(SCompBlock)); - } else { - vnodeReadLastBlockToMem(pObj, &lastBlock, data); - pHinfo->compInfo.numOfBlocks--; - code = lastBlock.numOfPoints; - } - - return code; -} - -int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinfo, SData *data[]) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg *pCfg = &pVnode->cfg; - TSKEY firstKey = *((TSKEY *)payload); - struct stat filestat; - int sid, rowsBefore = 0; - - if (pVnode->nfd <= 0 || firstKey > pVnode->commitLastKey) { - if (pVnode->nfd > 0) vnodeCloseFileForImport(pObj, pHinfo); - - pVnode->commitFirstKey = firstKey; - if (vnodeOpenCommitFiles(pVnode, pObj->sid) < 0) return -1; - - fstat(pVnode->hfd, &filestat); - pHinfo->hfdSize = filestat.st_size; - pHinfo->newBlocks = 0; - pHinfo->last = 1; // by default, new blockes are at the end of block list - - lseek(pVnode->hfd, TSDB_FILE_HEADER_LEN, SEEK_SET); - read(pVnode->hfd, pHinfo->headList, sizeof(SCompHeader) * pCfg->maxSessions); - - if (pHinfo->headList[pObj->sid].compInfoOffset > 0) { - lseek(pVnode->hfd, pHinfo->headList[pObj->sid].compInfoOffset, SEEK_SET); - if (read(pVnode->hfd, &pHinfo->compInfo, sizeof(SCompInfo)) != sizeof(SCompInfo)) { - dError("vid:%d sid:%d, failed to read compInfo from file:%s", pObj->vnode, pObj->sid, pVnode->cfn); - return -1; - } - - if (pHinfo->compInfo.uid == pObj->uid) { - pHinfo->compInfoOffset = pHinfo->headList[pObj->sid].compInfoOffset; - pHinfo->leftOffset = pHinfo->headList[pObj->sid].compInfoOffset + sizeof(SCompInfo); - } else { - pHinfo->headList[pObj->sid].compInfoOffset = 0; - } - } - - if ( pHinfo->headList[pObj->sid].compInfoOffset == 0 ) { - memset(&pHinfo->compInfo, 0, sizeof(SCompInfo)); - pHinfo->compInfo.uid = pObj->uid; - - for (sid = pObj->sid + 1; sid < pCfg->maxSessions; ++sid) - if (pHinfo->headList[sid].compInfoOffset > 0) break; - - pHinfo->compInfoOffset = (sid == pCfg->maxSessions) ? pHinfo->hfdSize : pHinfo->headList[sid].compInfoOffset; - pHinfo->leftOffset = pHinfo->compInfoOffset; - } - - pHinfo->oldNumOfBlocks = pHinfo->compInfo.numOfBlocks; - lseek(pVnode->hfd, 0, SEEK_SET); - lseek(pVnode->nfd, 0, SEEK_SET); - tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfoOffset); - twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo)); - if (pHinfo->headList[pObj->sid].compInfoOffset > 0) lseek(pVnode->hfd, sizeof(SCompInfo), SEEK_CUR); - - if (pVnode->commitFileId < pImport->fileId) { - if (pHinfo->compInfo.numOfBlocks > 0) - pHinfo->leftOffset += pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock); - - rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data); - - // copy all existing compBlockInfo - lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET); - if (pHinfo->compInfo.numOfBlocks > 0) - tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock)); - - } else if (pVnode->commitFileId == pImport->fileId) { - int slots = pImport->pos ? pImport->slot + 1 : pImport->slot; - assert(slots >= 0); - pHinfo->leftOffset += slots * sizeof(SCompBlock); - - // check if last block is at last file, if it is, read into memory - if (pImport->pos == 0 && pHinfo->compInfo.numOfBlocks > 0 && pImport->slot == pHinfo->compInfo.numOfBlocks && - pHinfo->compInfo.last) { - rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data); - if ( rowsBefore > 0 ) pImport->slot--; - } - - // this block will be replaced by new blocks - if (pImport->pos > 0) pHinfo->compInfo.numOfBlocks--; - - if (pImport->slot > 0) { - lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET); - tsendfile(pVnode->nfd, pVnode->hfd, NULL, pImport->slot * sizeof(SCompBlock)); - } - - if (pImport->slot < pHinfo->compInfo.numOfBlocks) - pHinfo->last = 0; // new blocks are not at the end of block list - - } else { - // nothing - - pHinfo->last = 0; // new blocks are not at the end of block list - } - } - - return rowsBefore; -} - -extern int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints); -int vnodeImportToFile(SImportInfo *pImport); - -void vnodeProcessImportTimer(void *param, void *tmrId) { - SImportInfo *pImport = (SImportInfo *)param; - if (pImport == NULL || pImport->signature != param) { - dError("import timer is messed up, signature:%p", pImport); - return; - } - - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - SShellObj *pShell = pImport->pShell; - - pImport->retry++; - - //slow query will block the import operation - int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_IMPORTING); - if (state >= TSDB_METER_STATE_DELETING) { - dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d", - pObj->vnode, pObj->sid, pObj->meterId, state); - return; - } - - int32_t num = 0; - pthread_mutex_lock(&pVnode->vmutex); - num = pObj->numOfQueries; - pthread_mutex_unlock(&pVnode->vmutex); - - //if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY - int32_t commitInProcess = 0; - pthread_mutex_lock(&pPool->vmutex); - if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || state != TSDB_METER_STATE_READY) { - pthread_mutex_unlock(&pPool->vmutex); - vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - - if (pImport->retry < 1000) { - dTrace("vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready." - "commitInProcess:%d, numOfQueries:%d, state:%d", pObj->vnode, pObj->sid, pObj->meterId, - commitInProcess, num, state); - - taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl); - return; - } else { - pShell->code = TSDB_CODE_TOO_SLOW; - } - } else { - pPool->commitInProcess = 1; - pthread_mutex_unlock(&pPool->vmutex); - int code = vnodeImportData(pObj, pImport); - if (pShell) { - pShell->code = code; - pShell->numOfTotalPoints += pImport->importedRows; - } - } - - vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - - pVnode->version++; - - // send response back to shell - if (pShell) { - pShell->count--; - if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pImport->pShell, pShell->code, pShell->numOfTotalPoints); - } - - pImport->signature = NULL; - free(pImport->opayload); - free(pImport); -} - -int vnodeImportToFile(SImportInfo *pImport) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg *pCfg = &pVnode->cfg; - SHeadInfo headInfo; - int code = 0, col; - SCompBlock compBlock; - char *payload = pImport->payload; - int rows = pImport->rows; - SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - - TSKEY lastKey = *((TSKEY *)(payload + pObj->bytesPerPoint * (rows - 1))); - TSKEY firstKey = *((TSKEY *)payload); - memset(&headInfo, 0, sizeof(headInfo)); - headInfo.headList = malloc(sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM)); - - SData *cdata[TSDB_MAX_COLUMNS]; - char * buffer1 = - malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns); - cdata[0] = (SData *)buffer1; - - SData *data[TSDB_MAX_COLUMNS]; - char * buffer2 = - malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns); - data[0] = (SData *)buffer2; - - for (col = 1; col < pObj->numOfColumns; ++col) { - cdata[col] = (SData *)(((char *)cdata[col - 1]) + sizeof(SData) + EXTRA_BYTES + - pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes); - data[col] = (SData *)(((char *)data[col - 1]) + sizeof(SData) + EXTRA_BYTES + - pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes); - } - - int rowsBefore = 0; - int rowsRead = 0; - int rowsUnread = 0; - int leftRows = rows; // left number of rows of imported data - int row, rowsToWrite; - int64_t offset[TSDB_MAX_COLUMNS]; - - if (pImport->pos > 0) { - for (col = 0; col < pObj->numOfColumns; ++col) - memcpy(data[col]->data, pImport->sdata[col]->data, pImport->pos * pObj->schema[col].bytes); - - rowsBefore = pImport->pos; - rowsRead = pImport->pos; - rowsUnread = pImport->numOfPoints - pImport->pos; - } - - dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to file, firstKey:%ld lastKey:%ld", - pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey); - do { - if (leftRows > 0) { - code = vnodeOpenFileForImport(pImport, payload, &headInfo, data); - if (code < 0) goto _exit; - if (code > 0) { - rowsBefore = code; - code = 0; - }; - } else { - // if payload is already imported, rows unread shall still be processed - rowsBefore = 0; - } - - int rowsToProcess = pObj->pointsPerFileBlock - rowsBefore; - if (rowsToProcess > leftRows) rowsToProcess = leftRows; - - for (col = 0; col < pObj->numOfColumns; ++col) { - offset[col] = data[col]->data + rowsBefore * pObj->schema[col].bytes; - } - - row = 0; - if (leftRows > 0) { - for (row = 0; row < rowsToProcess; ++row) { - if (*((TSKEY *)payload) > pVnode->commitLastKey) break; - - for (col = 0; col < pObj->numOfColumns; ++col) { - memcpy((void *)offset[col], payload, pObj->schema[col].bytes); - payload += pObj->schema[col].bytes; - offset[col] += pObj->schema[col].bytes; - } - } - } - - leftRows -= row; - rowsToWrite = rowsBefore + row; - rowsBefore = 0; - - if (leftRows == 0 && rowsUnread > 0) { - // copy the unread - int rowsToCopy = pObj->pointsPerFileBlock - rowsToWrite; - if (rowsToCopy > rowsUnread) rowsToCopy = rowsUnread; - - for (col = 0; col < pObj->numOfColumns; ++col) { - int bytes = pObj->schema[col].bytes; - memcpy(data[col]->data + rowsToWrite * bytes, pImport->sdata[col]->data + rowsRead * bytes, rowsToCopy * bytes); - } - - rowsRead += rowsToCopy; - rowsUnread -= rowsToCopy; - rowsToWrite += rowsToCopy; - } - - for (col = 0; col < pObj->numOfColumns; ++col) { - data[col]->len = rowsToWrite * pObj->schema[col].bytes; - } - - compBlock.last = headInfo.last; - vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite); - twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock)); - - rowsToWrite = 0; - headInfo.newBlocks++; - - } while (leftRows > 0 || rowsUnread > 0); - - if (compBlock.keyLast > pObj->lastKeyOnFile) - pObj->lastKeyOnFile = compBlock.keyLast; - - vnodeCloseFileForImport(pObj, &headInfo); - dTrace("vid:%d sid:%d id:%s, %d rows data are imported to file", pObj->vnode, pObj->sid, pObj->meterId, rows); - - SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; - pthread_mutex_lock(&pPool->vmutex); - - if (pInfo->numOfBlocks > 0) { - int slot = (pInfo->currentSlot - pInfo->numOfBlocks + 1 + pInfo->maxBlocks) % pInfo->maxBlocks; - TSKEY firstKeyInCache = *((TSKEY *)(pInfo->cacheBlocks[slot]->offset[0])); - - // data may be in commited cache, cache shall be released - if (lastKey > firstKeyInCache) { - while (slot != pInfo->commitSlot) { - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; - vnodeFreeCacheBlock(pCacheBlock); - slot = (slot + 1 + pInfo->maxBlocks) % pInfo->maxBlocks; - } - - // last slot, the uncommitted slots shall be shifted - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; - int points = pCacheBlock->numOfPoints - pInfo->commitPoint; - if (points > 0) { - for (int col = 0; col < pObj->numOfColumns; ++col) { - int size = points * pObj->schema[col].bytes; - memmove(pCacheBlock->offset[col], pCacheBlock->offset[col] + pObj->schema[col].bytes * pInfo->commitPoint, size); - } - } - - if (pInfo->commitPoint != pObj->pointsPerBlock) { - // commit point shall be set to 0 if last block is not full - pInfo->commitPoint = 0; - pCacheBlock->numOfPoints = points; - if (slot == pInfo->currentSlot) { - __sync_fetch_and_add(&pObj->freePoints, pInfo->commitPoint); - } - } else { - // if last block is full and committed - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; - if (pCacheBlock->pMeterObj == pObj) { - vnodeFreeCacheBlock(pCacheBlock); - } - } - } - } - - if (lastKey > pObj->lastKeyOnFile) pObj->lastKeyOnFile = lastKey; - - pthread_mutex_unlock(&pPool->vmutex); - -_exit: - tfree(headInfo.headList); - tfree(buffer1); - tfree(buffer2); - tfree(pImport->buffer); - - return code; -} - -int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg *pCfg = &pVnode->cfg; - int code = -1; - SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; - int slot, pos, row, col, points, tpoints; - - char *data[TSDB_MAX_COLUMNS], *current[TSDB_MAX_COLUMNS]; - int slots = pInfo->unCommittedBlocks + 1; - int trows = slots * pObj->pointsPerBlock + rows; // max rows in buffer - int tsize = (trows / pObj->pointsPerBlock + 1) * pCfg->cacheBlockSize; - TSKEY firstKey = *((TSKEY *)payload); - TSKEY lastKey = *((TSKEY *)(payload + pObj->bytesPerPoint * (rows - 1))); - - if (pObj->freePoints < rows || pObj->freePoints < (pObj->pointsPerBlock << 1)) { - dError("vid:%d sid:%d id:%s, import failed, cache is full, freePoints:%d", pObj->vnode, pObj->sid, pObj->meterId, - pObj->freePoints); - pImport->importedRows = 0; - pImport->commit = 1; - code = TSDB_CODE_ACTION_IN_PROGRESS; - return code; - } - - assert(rows); - dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to cache, firstKey:%ld lastKey:%ld", - pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey); - - pthread_mutex_lock(&(pVnode->vmutex)); - if (firstKey < pVnode->firstKey) pVnode->firstKey = firstKey; - pthread_mutex_unlock(&(pVnode->vmutex)); - - char *buffer = malloc(tsize); // buffer to hold unCommitted data plus import data - data[0] = buffer; - current[0] = data[0]; - for (col = 1; col < pObj->numOfColumns; ++col) { - data[col] = data[col - 1] + trows * pObj->schema[col - 1].bytes; - current[col] = data[col]; - } - - // write import data into buffer first - for (row = 0; row < rows; ++row) { - for (col = 0; col < pObj->numOfColumns; ++col) { - memcpy(current[col], payload, pObj->schema[col].bytes); - payload += pObj->schema[col].bytes; - current[col] += pObj->schema[col].bytes; - } - } - - // copy the overwritten data into buffer - tpoints = rows; - pos = pImport->pos; - slot = pImport->slot; - while (1) { - points = pInfo->cacheBlocks[slot]->numOfPoints - pos; - for (col = 0; col < pObj->numOfColumns; ++col) { - int size = points * pObj->schema[col].bytes; - memcpy(current[col], pInfo->cacheBlocks[slot]->offset[col] + pos * pObj->schema[col].bytes, size); - current[col] += size; - } - pos = 0; - tpoints += points; - - if (slot == pInfo->currentSlot) break; - slot = (slot + 1) % pInfo->maxBlocks; - } - - for (col = 0; col < pObj->numOfColumns; ++col) current[col] = data[col]; - pos = pImport->pos; - - // write back to existing slots first - slot = pImport->slot; - while (1) { - points = (tpoints > pObj->pointsPerBlock - pos) ? pObj->pointsPerBlock - pos : tpoints; - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; - for (col = 0; col < pObj->numOfColumns; ++col) { - int size = points * pObj->schema[col].bytes; - memcpy(pCacheBlock->offset[col] + pos * pObj->schema[col].bytes, current[col], size); - current[col] += size; - } - pCacheBlock->numOfPoints = points + pos; - pos = 0; - tpoints -= points; - - if (slot == pInfo->currentSlot) break; - slot = (slot + 1) % pInfo->maxBlocks; - } - - // allocate new cache block if there are still data left - while (tpoints > 0) { - pImport->commit = vnodeAllocateCacheBlock(pObj); - if (pImport->commit < 0) goto _exit; - points = (tpoints > pObj->pointsPerBlock) ? pObj->pointsPerBlock : tpoints; - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[pInfo->currentSlot]; - for (col = 0; col < pObj->numOfColumns; ++col) { - int size = points * pObj->schema[col].bytes; - memcpy(pCacheBlock->offset[col] + pos * pObj->schema[col].bytes, current[col], size); - current[col] += size; - } - tpoints -= points; - pCacheBlock->numOfPoints = points; - } - - code = 0; - __sync_fetch_and_sub(&pObj->freePoints, rows); - dTrace("vid:%d sid:%d id:%s, %d rows data are imported to cache", pObj->vnode, pObj->sid, pObj->meterId, rows); - -_exit: - free(buffer); - return code; -} - -int vnodeFindKeyInFile(SImportInfo *pImport, int order) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - int code = -1; - SQuery query; - SColumnFilter colList[TSDB_MAX_COLUMNS] = {0}; - - TSKEY key = order ? pImport->firstKey : pImport->lastKey; - memset(&query, 0, sizeof(query)); - query.order.order = order; - query.skey = key; - query.ekey = order ? INT64_MAX : 0; - query.colList = colList; - query.numOfCols = pObj->numOfColumns; - - for (int16_t i = 0; i < pObj->numOfColumns; ++i) { - colList[i].data.colId = pObj->schema[i].colId; - colList[i].data.bytes = pObj->schema[i].bytes; - colList[i].data.type = pObj->schema[i].type; - - colList[i].colIdx = i; - colList[i].colIdxInBuf = i; - } - - int ret = vnodeSearchPointInFile(pObj, &query); - - if (ret >= 0) { - if (query.slot < 0) { - pImport->slot = 0; - pImport->pos = 0; - pImport->key = 0; - pImport->fileId = pVnode->fileId - pVnode->numOfFiles + 1; - dTrace("vid:%d sid:%d id:%s, import to head of file", pObj->vnode, pObj->sid, pObj->meterId); - code = 0; - } else if (query.slot >= 0) { - code = 0; - pImport->slot = query.slot; - pImport->pos = query.pos; - pImport->key = query.key; - pImport->fileId = query.fileId; - SCompBlock *pBlock = &query.pBlock[query.slot]; - pImport->numOfPoints = pBlock->numOfPoints; - - if (pImport->key != key) { - if (order == 0) { - pImport->pos++; - - if (pImport->pos >= pBlock->numOfPoints) { - pImport->slot++; - pImport->pos = 0; - } - } else { - if (pImport->pos < 0) pImport->pos = 0; - } - } - - if (pImport->key != key && pImport->pos > 0) { - if ( pObj->sversion != pBlock->sversion ) { - dError("vid:%d sid:%d id:%s, import sversion not matached, expected:%d received:%d", pObj->vnode, pObj->sid, - pBlock->sversion, pObj->sversion); - code = TSDB_CODE_OTHERS; - } else { - pImport->offset = pBlock->offset; - - pImport->buffer = - malloc(pObj->bytesPerPoint * pVnode->cfg.rowsInFileBlock + sizeof(SData) * pObj->numOfColumns); - pImport->sdata[0] = (SData *)pImport->buffer; - for (int col = 1; col < pObj->numOfColumns; ++col) - pImport->sdata[col] = (SData *)(((char *)pImport->sdata[col - 1]) + sizeof(SData) + - pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes); - - code = vnodeReadCompBlockToMem(pObj, &query, pImport->sdata); - if (code < 0) { - code = -code; - tfree(pImport->buffer); - } - } - } - } - } else { - dError("vid:%d sid:%d id:%s, file is corrupted, import failed", pObj->vnode, pObj->sid, pObj->meterId); - code = -ret; - } - - tclose(query.hfd); - tclose(query.dfd); - tclose(query.lfd); - vnodeFreeFields(&query); - tfree(query.pBlock); - - return code; -} - -int vnodeFindKeyInCache(SImportInfo *pImport, int order) { - SMeterObj *pObj = pImport->pObj; - int code = 0; - SQuery query; - SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; - - TSKEY key = order ? pImport->firstKey : pImport->lastKey; - memset(&query, 0, sizeof(query)); - query.order.order = order; - query.skey = key; - query.ekey = order ? pImport->lastKey : pImport->firstKey; - vnodeSearchPointInCache(pObj, &query); - - if (query.slot < 0) { - pImport->slot = pInfo->commitSlot; - if (pInfo->commitPoint >= pObj->pointsPerBlock) pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks; - pImport->pos = 0; - pImport->key = 0; - dTrace("vid:%d sid:%d id:%s, key:%ld, import to head of cache", pObj->vnode, pObj->sid, pObj->meterId, key); - code = 0; - } else { - pImport->slot = query.slot; - pImport->pos = query.pos; - pImport->key = query.key; - - if (key != query.key) { - if (order == 0) { - // since pos is the position which has smaller key, data shall be imported after it - pImport->pos++; - if (pImport->pos >= pObj->pointsPerBlock) { - pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks; - pImport->pos = 0; - } - } else { - if (pImport->pos < 0) pImport->pos = 0; - } - } - code = 0; - } - - return code; -} - -int vnodeImportStartToCache(SImportInfo *pImport, char *payload, int rows) { - int code = 0; - SMeterObj *pObj = pImport->pObj; - - code = vnodeFindKeyInCache(pImport, 1); - if (code != 0) return code; - - if (pImport->key != pImport->firstKey) { - rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key); - pImport->importedRows = rows; - code = vnodeImportToCache(pImport, payload, rows); - } else { - dTrace("vid:%d sid:%d id:%s, data is already imported to cache, firstKey:%lld", pObj->vnode, pObj->sid, pObj->meterId, pImport->firstKey); - } - - return code; -} - -int vnodeImportStartToFile(SImportInfo *pImport, char *payload, int rows) { - int code = 0; - SMeterObj *pObj = pImport->pObj; - - code = vnodeFindKeyInFile(pImport, 1); - if (code != 0) return code; - - assert(pImport->slot >= 0); - - if (pImport->key != pImport->firstKey) { - pImport->payload = payload; - pImport->rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key); - pImport->importedRows = pImport->rows; - code = vnodeImportToFile(pImport); - } else { - dTrace("vid:%d sid:%d id:%s, data is already imported to file", pObj->vnode, pObj->sid, pObj->meterId); - } - - return code; -} - -int vnodeImportWholeToFile(SImportInfo *pImport, char *payload, int rows) { - int code = 0; - SMeterObj *pObj = pImport->pObj; - - code = vnodeFindKeyInFile(pImport, 0); - if (code != 0) return code; - - if (pImport->key != pImport->lastKey) { - pImport->payload = payload; - pImport->rows = vnodeGetImportEndPart(pObj, payload, rows, &pImport->payload, pImport->key); - pImport->importedRows = pImport->rows; - code = vnodeImportToFile(pImport); - } else { - code = vnodeImportStartToFile(pImport, payload, rows); - } - - return code; -} - -int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) { - int code = 0; - SMeterObj *pObj = pImport->pObj; - - code = vnodeFindKeyInCache(pImport, 0); - if (code != 0) return code; - - if (pImport->key != pImport->lastKey) { - char *pStart; - if ( pImport->key < pObj->lastKeyOnFile ) pImport->key = pObj->lastKeyOnFile; - rows = vnodeGetImportEndPart(pObj, payload, rows, &pStart, pImport->key); - pImport->importedRows = rows; - code = vnodeImportToCache(pImport, pStart, rows); - } else { - if (pImport->firstKey > pObj->lastKeyOnFile) { - code = vnodeImportStartToCache(pImport, payload, rows); - } else if (pImport->firstKey < pObj->lastKeyOnFile) { - code = vnodeImportStartToFile(pImport, payload, rows); - } else { // firstKey == pObj->lastKeyOnFile - dTrace("vid:%d sid:%d id:%s, data is already there", pObj->vnode, pObj->sid, pObj->meterId); - } - } - - return code; -} - -int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion, - int *pNumOfPoints, TSKEY now) { - SSubmitMsg *pSubmit = (SSubmitMsg *)cont; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - int rows; - char *payload; - int code = TSDB_CODE_ACTION_IN_PROGRESS; - SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - SShellObj *pShell = (SShellObj *)param; - int pointsImported = 0; - - rows = htons(pSubmit->numOfRows); - int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows); - if (expectedLen != contLen) { - dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId, - expectedLen, contLen); - return TSDB_CODE_WRONG_MSG_SIZE; - } - - if (sversion != pObj->sversion) { - dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId, - pObj->sversion, sversion); - return TSDB_CODE_OTHERS; - } - - payload = pSubmit->payLoad; - int firstId = (*(TSKEY *)payload)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; - int lastId = (*(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1)))/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; - int cfile = now/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; - if ((firstId <= cfile - pVnode->maxFiles) || (firstId > cfile + 1) || (lastId <= cfile - pVnode->maxFiles) || (lastId > cfile + 1)) { - dError("vid:%d sid:%d id:%s, invalid timestamp to import, rows:%d firstKey: %ld lastKey: %ld", - pObj->vnode, pObj->sid, pObj->meterId, rows, *(TSKEY *)(payload), *(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1))); - return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; - } - - if ( pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { - if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG; - code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion); - if (code != 0) return code; - } - - if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) { - vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); - code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now); - - if (pShell) { - pShell->code = code; - pShell->numOfTotalPoints += pointsImported; - } - - vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); - } else { - SImportInfo *pNew, import; - - dTrace("vid:%d sid:%d id:%s, import %d rows data", pObj->vnode, pObj->sid, pObj->meterId, rows); - memset(&import, 0, sizeof(import)); - import.firstKey = *((TSKEY *)(payload)); - import.lastKey = *((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)); - import.pObj = pObj; - import.pShell = pShell; - import.payload = payload; - import.rows = rows; - - int32_t num = 0; - pthread_mutex_lock(&pVnode->vmutex); - num = pObj->numOfQueries; - pthread_mutex_unlock(&pVnode->vmutex); - - int32_t commitInProcess = 0; - - pthread_mutex_lock(&pPool->vmutex); - if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) { - pthread_mutex_unlock(&pPool->vmutex); - - pNew = (SImportInfo *)malloc(sizeof(SImportInfo)); - memcpy(pNew, &import, sizeof(SImportInfo)); - pNew->signature = pNew; - int payloadLen = contLen - sizeof(SSubmitMsg); - pNew->payload = malloc(payloadLen); - pNew->opayload = pNew->payload; - memcpy(pNew->payload, payload, payloadLen); - - dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid, - pObj->meterId, commitInProcess, pObj->numOfQueries); - - taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl); - return 0; - } else { - pPool->commitInProcess = 1; - pthread_mutex_unlock(&pPool->vmutex); - int code = vnodeImportData(pObj, &import); - if (pShell) { - pShell->code = code; - pShell->numOfTotalPoints += import.importedRows; - } - } - } - - pVnode->version++; - - if (pShell) { - pShell->count--; - if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pShell, pShell->code, pShell->numOfTotalPoints); - } - - return 0; -} - -//todo abort from the procedure if the meter is going to be dropped -int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) { - int code = 0; - - if (pImport->lastKey > pObj->lastKeyOnFile) { - code = vnodeImportWholeToCache(pImport, pImport->payload, pImport->rows); - } else if (pImport->lastKey < pObj->lastKeyOnFile) { - code = vnodeImportWholeToFile(pImport, pImport->payload, pImport->rows); - } else { // lastKey == pObj->lastkeyOnFile - code = vnodeImportStartToFile(pImport, pImport->payload, pImport->rows); - } - - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - pPool->commitInProcess = 0; - - if (pImport->commit) vnodeProcessCommitTimer(pVnode, NULL); - - return code; -} diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index bd52ac0bc4..e0dcb0aa3f 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -493,27 +493,6 @@ char *taosIpStr(uint32_t ipInt) { sprintf(ipStr, "0x%x:%u.%u.%u.%u", ipInt, ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24)); return ipStr; } -<<<<<<< HEAD - -typedef struct CharsetPair { - char *oldCharset; - char *newCharset; -} CharsetPair; - -char *taosCharsetReplace(char *charsetstr) { - CharsetPair charsetRep[] = { - { "utf8", "UTF-8" }, { "936", "CP936" }, - }; - - for (int32_t i = 0; i < tListLen(charsetRep); ++i) { - if (strcasecmp(charsetRep[i].oldCharset, charsetstr) == 0) { - return strdup(charsetRep[i].newCharset); - } - } - - return strdup(charsetstr); -} -======= #ifndef CLUSTER void taosCleanupTier() {} @@ -549,4 +528,3 @@ char *taosCharsetReplace(char *charsetstr) { return strdup(charsetstr); } ->>>>>>> release/v1.6.4.0 From 2e60dfb3a054772a60966ce3055c4a4ff62cfa6b Mon Sep 17 00:00:00 2001 From: liu0x54 Date: Mon, 2 Dec 2019 11:13:33 +0800 Subject: [PATCH 09/55] [TBASE 1270]revise the go connector description --- .../webdocs/markdowndocs/connector-ch.md | 55 ++++++++++++++++++- 1 file changed, 52 insertions(+), 3 deletions(-) diff --git a/documentation/webdocs/markdowndocs/connector-ch.md b/documentation/webdocs/markdowndocs/connector-ch.md index a8ceaa848f..03801fac89 100644 --- a/documentation/webdocs/markdowndocs/connector-ch.md +++ b/documentation/webdocs/markdowndocs/connector-ch.md @@ -391,15 +391,64 @@ curl -u username:password -d '' :/rest/sql ## Go Connector -TDengine提供了GO驱动程序“taosSql”包。taosSql驱动包是基于GO的“database/sql/driver”接口的实现。用户可在安装后的/usr/local/taos/connector/go目录获得GO的客户端驱动程序。用户需将驱动包/usr/local/taos/connector/go/src/taosSql目录拷贝到应用程序工程的src目录下。然后在应用程序中导入驱动包,就可以使用“database/sql”中定义的接口访问TDengine: +#### 安装TDengine + +Go的链接器使用了到了 libtaos.so 和taos.h,因此,在使用Go连接器之前,需要在程序运行的机器上安装TDengine以获得相关的驱动文件。 + +#### Go语言引入package +TDengine提供了GO驱动程序“taosSql”包。taosSql驱动包是基于GO的“database/sql/driver”接口的实现。用户可以通过`go get`命令来获取驱动包。 +```sh +go get github.com/taosdata/TDengine/src/connector/go/src/taosSql +``` +然后在应用程序中导入驱动包,就可以使用“database/sql”中定义的接口访问TDengine: ```Go import ( "database/sql" - _ "taosSql" + _ "github.com/taosdata/TDengine/src/connector/go/src/taosSql" ) ``` taosSql驱动包内采用cgo模式,调用了TDengine的C/C++同步接口,与TDengine进行交互,因此,在数据库操作执行完成之前,客户端应用将处于阻塞状态。单个数据库连接,在同一时刻只能有一个线程调用API。客户应用可以建立多个连接,进行多线程的数据写入或查询处理。 -更多使用的细节,请参考下载目录中的示例源码。 +#### Go语言使用参考 +在Go程序中使用TDengine写入方法大致可以分为以下几步 +1. 打开TDengine数据库链接 + +首先需要调用sql包中的Open方法,打开数据库,并获得db对象 +```go + db, err := sql.Open(taosDriverName, dbuser+":"+dbpassword+"@/tcp("+daemonUrl+")/"+dbname) + if err != nil { + log.Fatalf("Open database error: %s\n", err) + } + defer db.Close() +``` +其中参数为 +- taosDataname: 涛思数据库的名称,其值为字符串"taosSql" +- dbuser和dbpassword: 链接TDengine的用户名和密码,缺省为root和taosdata,类型为字符串 +- daemonUrl: 为TDengine的地址,其形式为`ip address:port`形式,port填写缺省值0即可。例如:"116.118.24.71:0" +- dbname:TDengine中的database名称,通过`create database`创建的数据库。如果为空则在后续的写入和查询操作必须通过”数据库名.超级表名或表名“的方式指定数据库名 + +2. 创建数据库 + +打开TDengine数据库连接后,首选需要创建数据库。基本用法和直接在TDengine客户端shell下一样,通过create database + 数据库名的方法来创建。 +```go + db, err := sql.Open(taosDriverName, dbuser+":"+dbpassword+"@/tcp("+daemonUrl+")/") + if err != nil { + log.Fatalf("Open database error: %s\n", err) + } + defer db.Close() + + //准备创建数据库语句 + sqlcmd := fmt.Sprintf("create database if not exists %s", dbname) + + //执行语句并检查错误 + _, err = db.Exec(sqlcmd) + if err != nil { + log.Fatalf("Create database error: %s\n", err) + } +``` + +3. 创建表、写入和查询数据 + +在创建好了数据库后,就可以开始创建表和写入查询数据了。这些操作的基本思路都是首先组装SQL语句,然后调用db.Exec执行,并检查错误信息和执行相应的处理。可以参考上面的样例代码 From 167d6cb70edc8a92d32f804eaa5f51d30d28c95e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 2 Dec 2019 15:47:18 +0800 Subject: [PATCH 10/55] fix #813 --- src/system/detail/src/vnodeImport.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index 2a0fda29f9..f0019a92ee 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -466,8 +466,6 @@ static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int SCompBlock *pBlock = pHandle->pBlocks + blockId; *code = TSDB_CODE_SUCCESS; - assert(pBlock->sversion == pObj->sversion); - SVnodeObj *pVnode = vnodeList + pObj->vnode; int dfd = pBlock->last ? pVnode->lfd : pVnode->dfd; @@ -989,6 +987,13 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int } } + int aslot = MIN(blockIter.slot, importHandle.compInfo.numOfBlocks - 1); + int64_t sversion = importHandle.pBlocks[aslot].sversion; + if (sversion != pObj->sversion) { + code = TSDB_CODE_OTHERS; + goto _error_merge; + } + // Open the new .t file if not opened yet. if (pVnode->nfd <= 0) { if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) { From e593c12cef6b7247d5d7ed24d372182fdbc31ee4 Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 2 Dec 2019 16:18:32 +0800 Subject: [PATCH 11/55] Instructions for using the RESTful interface --- .../webdocs/markdowndocs/connector-ch.md | 126 ++++++++++++++++-- 1 file changed, 116 insertions(+), 10 deletions(-) diff --git a/documentation/webdocs/markdowndocs/connector-ch.md b/documentation/webdocs/markdowndocs/connector-ch.md index 9261b9eef6..e629a44bf1 100644 --- a/documentation/webdocs/markdowndocs/connector-ch.md +++ b/documentation/webdocs/markdowndocs/connector-ch.md @@ -316,11 +316,23 @@ import taos 如:http://192.168.0.1:6020/rest/sql 是指向IP地址为192.168.0.1的URL. -HTTP请求的Header里需带有身份认证信息,TDengine单机版仅支持Basic认证机制。 +HTTP请求的Header里需带有身份认证信息,TDengine支持Basic认证与自定义认证两种机制,后续版本将提供标准安全的数字签名机制来做身份验证。 + +- 自定义身份认证信息如下所示(稍后介绍) + +``` +Authorization: Taosd +``` + +- Basic身份认证信息如下所示 + +``` +Authorization: Basic +``` HTTP请求的BODY里就是一个完整的SQL语句,SQL语句中的数据表应提供数据库前缀,例如\.\。如果表名不带数据库前缀,系统会返回错误。因为HTTP模块只是一个简单的转发,没有当前DB的概念。 -使用curl来发起一个HTTP Request, 语法如下: +使用curl通过自定义身份认证方式来发起一个HTTP Request, 语法如下: ``` curl -H 'Authorization: Basic ' -d '' :/rest/sql @@ -332,7 +344,7 @@ curl -H 'Authorization: Basic ' -d '' :/rest/sql curl -u username:password -d '' :/rest/sql ``` -其中,`TOKEN`为`{username}:{password}`经过Base64编码之后的字符串,例如`root:taosdata`编码后为`cm9vdDp0YW9zZGF0YQ==` +其中,`TOKEN`为`{username}:{password}`经过Base64编码之后的字符串, 例如`root:taosdata`编码后为`cm9vdDp0YW9zZGF0YQ==` ### HTTP返回格式 @@ -356,21 +368,55 @@ curl -u username:password -d '' :/rest/sql - 第三行是具体返回的数据,一排一排的呈现。如果不返回结果集,仅[[affected_rows]] - 第四行”rows”表明总共多少行数据 +### 自定义授权码 + +HTTP请求中需要带有授权码``, 用于身份识别。授权码通常由管理员提供, 可简单的通过发送`HTTP GET`请求来获取授权码, 操作如下: + +``` +curl http://:6020/rest/login// +``` + +其中, `ip`是TDengine数据库的IP地址, `username`为数据库用户名, `password`为数据库密码, 返回值为`JSON`格式, 各字段含义如下: + +- status:请求结果的标志位 + +- code:返回值代码 + +- desc: 授权码 + +获取授权码示例: + +``` +curl http://192.168.0.1:6020/rest/login/root/taosdata +``` + +返回值: + +``` +{ + "status": "succ", + "code": 0, + "desc": +"/KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04" +} +``` + ### 使用示例 - 在demo库里查询表t1的所有记录, curl如下: - `curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sql` - - 返回值: +``` +curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sql` +``` +返回值: ``` { "status": "succ", "head": ["column1","column2","column3"], "data": [ - ["2017-12-12 23:44:25.730", 1, 2.3], - ["2017-12-12 22:44:25.728", 4, 5.6] + ["2017-12-12 22:44:25.728",4,5.60000], + ["2017-12-12 23:44:25.730",1,2.30000] ], "rows": 2 } @@ -378,9 +424,11 @@ curl -u username:password -d '' :/rest/sql - 创建库demo: - `curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create database demo' 192.168.0.1:6020/rest/sql` +``` +curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create database demo' 192.168.0.1:6020/rest/sql` +``` - 返回值: +返回值: ``` { "status": "succ", @@ -390,6 +438,64 @@ curl -u username:password -d '' :/rest/sql } ``` +### 其他用法 + +#### 结果集采用Unix时间戳 + +HTTP请求URL采用`sqlt`时,返回结果集的时间戳将采用Unix时间戳格式表示,例如 + +``` +curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sqlt` +``` + +返回值: + +``` +{ + "status": "succ", + "head": ["column1","column2","column3"], + "data": [ + [1513089865728,4,5.60000], + [1513093465730,1,2.30000] + ], + "rows": 2 +} +``` + +#### 结果集采用UTC时间字符串 + +HTTP请求URL采用`sqlutc`时,返回结果集的时间戳将采用UTC时间字符串表示,例如 +``` + curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sqlutc` +``` + +返回值: + +``` +{ + "status": "succ", + "head": ["column1","column2","column3"], + "data": [ + ["2017-12-12T22:44:25.728+0800",4,5.60000], + ["2017-12-12T23:44:25.730+0800",1,2.30000] + ], + "rows": 2 +} +``` + +### 重要配置项 + +下面仅列出一些与RESTFul接口有关的配置参数,其他系统参数请看配置文件里的说明。注意:配置修改后,需要重启taosd服务才能生效 + +- httpIp: 对外提供RESTFul服务的IP地址,默认绑定到0.0.0.0 +- httpPort: 对外提供RESTFul服务的端口号,默认绑定到6020 +- httpMaxThreads: 启动的线程数量,默认为2 +- httpCacheSessions: 缓存连接的数量,并发请求数目需小于此数值的10倍,默认值为100 +- restfulRowLimit: 返回结果集(JSON格式)的最大条数,默认值为10240 +- httpEnableCompress: 是否支持压缩,默认不支持,目前TDengine仅支持gzip压缩格式 +- httpDebugFlag: 日志开关,131:仅错误和报警信息,135:所有,默认131 + + ## Go Connector #### 安装TDengine From 0517feb1fd1c52733ca8683b78c913a7eb594a43 Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 2 Dec 2019 16:31:38 +0800 Subject: [PATCH 12/55] Instructions for using the RESTful interface --- .../webdocs/markdowndocs/connector-ch.md | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/documentation/webdocs/markdowndocs/connector-ch.md b/documentation/webdocs/markdowndocs/connector-ch.md index e629a44bf1..85596f2053 100644 --- a/documentation/webdocs/markdowndocs/connector-ch.md +++ b/documentation/webdocs/markdowndocs/connector-ch.md @@ -306,15 +306,16 @@ import taos ### HTTP请求格式 -​ `http://:/rest/sql` +``` +http://:/rest/sql +``` -​ 参数说明: +​参数说明: -​ IP: 集群中的任一台主机 +- IP: 集群中的任一台主机 +- PORT: 配置文件中httpPort配置项,缺省为6020 -​ PORT: 配置文件中httpPort配置项,缺省为6020 - -如:http://192.168.0.1:6020/rest/sql 是指向IP地址为192.168.0.1的URL. +例如:http://192.168.0.1:6020/rest/sql 是指向IP地址为192.168.0.1的URL. HTTP请求的Header里需带有身份认证信息,TDengine支持Basic认证与自定义认证两种机制,后续版本将提供标准安全的数字签名机制来做身份验证。 @@ -348,7 +349,8 @@ curl -u username:password -d '' :/rest/sql ### HTTP返回格式 -返回值为JSON格式,如下: +返回值为JSON格式,如下: + ``` { "status": "succ", @@ -363,10 +365,10 @@ curl -u username:password -d '' :/rest/sql 说明: -- 第一行”status”告知操作结果是成功还是失败; -- 第二行”head”是表的定义,如果不返回结果集,仅有一列“affected_rows”; -- 第三行是具体返回的数据,一排一排的呈现。如果不返回结果集,仅[[affected_rows]] -- 第四行”rows”表明总共多少行数据 +- status: 告知操作结果是成功还是失败 +- head: 表的定义,如果不返回结果集,仅有一列“affected_rows” +- data: 具体返回的数据,一排一排的呈现,如果不返回结果集,仅[[affected_rows]] +- rows: 表明总共多少行数据 ### 自定义授权码 @@ -403,7 +405,7 @@ curl http://192.168.0.1:6020/rest/login/root/taosdata ### 使用示例 -- 在demo库里查询表t1的所有记录, curl如下: +- 在demo库里查询表t1的所有记录: ``` curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sql` @@ -445,7 +447,7 @@ curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create database demo' 19 HTTP请求URL采用`sqlt`时,返回结果集的时间戳将采用Unix时间戳格式表示,例如 ``` -curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sqlt` +curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sqlt ``` 返回值: @@ -466,7 +468,7 @@ curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 1 HTTP请求URL采用`sqlutc`时,返回结果集的时间戳将采用UTC时间字符串表示,例如 ``` - curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sqlutc` + curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sqlutc ``` 返回值: From e5418dd3e13370be8a5d5b7518e5069472389e58 Mon Sep 17 00:00:00 2001 From: haojun Liao Date: Mon, 2 Dec 2019 18:37:51 +0800 Subject: [PATCH 13/55] Update TAOS SQL-ch.md Add some detailed description with respect to the time unit conversion for interval clause. --- documentation/webdocs/markdowndocs/TAOS SQL-ch.md | 1 + 1 file changed, 1 insertion(+) diff --git a/documentation/webdocs/markdowndocs/TAOS SQL-ch.md b/documentation/webdocs/markdowndocs/TAOS SQL-ch.md index 6a8549bbd2..347ac4f21f 100644 --- a/documentation/webdocs/markdowndocs/TAOS SQL-ch.md +++ b/documentation/webdocs/markdowndocs/TAOS SQL-ch.md @@ -18,6 +18,7 @@ TDengine提供类似SQL语法,用户可以在TDengine Shell中使用SQL语句 - 插入记录时,如果时间戳为0,插入数据时使用服务器当前时间 - Epoch Time: 时间戳也可以是一个长整数,表示从1970-01-01 08:00:00.000开始的毫秒数 - 时间可以加减,比如 now-2h,表明查询时刻向前推2个小时(最近2小时)。数字后面的时间单位:a(毫秒), s(秒), m(分), h(小时), d(天),w(周), n(月), y(年)。比如select * from t1 where ts > now-2w and ts <= now-1w, 表示查询两周前整整一周的数据 +- TDengine暂不支持时间窗口按照自然年和自然月切分。Where条件中的时间窗口单位的换算关系如下:interval(1y) 等效于 interval(365d), interval(1n) 等效于 interval(30d), interval(1w) 等效于 interval(7d) TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMicrosecond就可支持微秒。 From d468cc5eb2d5187c06982b4bcec11fb195101a54 Mon Sep 17 00:00:00 2001 From: lihui Date: Mon, 2 Dec 2019 19:01:20 +0800 Subject: [PATCH 14/55] [TBASE-1271] --- packaging/deb/makedeb.sh | 2 +- packaging/rpm/makerpm.sh | 27 ++++++++++++++++++++++++--- packaging/rpm/tdengine.spec | 2 +- packaging/tools/makeclient.sh | 4 ++-- packaging/tools/makepkg.sh | 4 ++-- src/kit/shell/src/shellEngine.c | 14 ++++++++++++-- src/kit/shell/src/shellLinux.c | 9 +++++++++ src/system/detail/src/dnodeService.c | 7 ++++++- 8 files changed, 57 insertions(+), 12 deletions(-) diff --git a/packaging/deb/makedeb.sh b/packaging/deb/makedeb.sh index 5766bd6836..501a06eddb 100755 --- a/packaging/deb/makedeb.sh +++ b/packaging/deb/makedeb.sh @@ -63,7 +63,7 @@ debver="Version: "$tdengine_ver sed -i "2c$debver" ${pkg_dir}/DEBIAN/control #get taos version, then set deb name -debname="tdengine-"${tdengine_ver}".deb" +debname="TDengine-"${tdengine_ver}".deb" # make deb package dpkg -b ${pkg_dir} $debname diff --git a/packaging/rpm/makerpm.sh b/packaging/rpm/makerpm.sh index 3b0f1d098e..e301f5ece9 100755 --- a/packaging/rpm/makerpm.sh +++ b/packaging/rpm/makerpm.sh @@ -2,6 +2,9 @@ # # Generate rpm package for centos +#set -e +#set -x + #curr_dir=$(pwd) compile_dir=$1 output_dir=$2 @@ -24,8 +27,25 @@ if command -v sudo > /dev/null; then csudo="sudo" fi +function cp_rpm_package() { +local cur_dir +cd $1 +cur_dir=$(pwd) + +for dirlist in $(ls ${cur_dir}); do + if test -d ${dirlist}; then + cd ${dirlist} + cp_rpm_package ${cur_dir}/${dirlist} + cd .. + fi + if test -e ${dirlist}; then + cp ${cur_dir}/${dirlist} ${output_dir}/TDengine-${tdengine_ver}.rpm + fi +done +} + if [ -d ${pkg_dir} ]; then - ${csudo} rm -rf ${pkg_dir} + ${csudo} rm -rf ${pkg_dir} fi ${csudo} mkdir -p ${pkg_dir} cd ${pkg_dir} @@ -35,7 +55,8 @@ ${csudo} mkdir -p BUILD BUILDROOT RPMS SOURCES SPECS SRPMS ${csudo} rpmbuild --define="_version ${tdengine_ver}" --define="_topdir ${pkg_dir}" --define="_compiledir ${compile_dir}" -bb ${spec_file} # copy rpm package to output_dir, then clean temp dir -#echo "rmpbuild end, cur_dir: $(pwd) " -${csudo} cp -rf RPMS/* ${output_dir} +#${csudo} cp -rf RPMS/* ${output_dir} +cp_rpm_package ${pkg_dir}/RPMS + cd .. ${csudo} rm -rf ${pkg_dir} diff --git a/packaging/rpm/tdengine.spec b/packaging/rpm/tdengine.spec index c0c0eacfae..ef02fb90fc 100644 --- a/packaging/rpm/tdengine.spec +++ b/packaging/rpm/tdengine.spec @@ -2,7 +2,7 @@ %define cfg_install_dir /etc/taos %define __strip /bin/true -Name: tdengine +Name: TDengine Version: %{_version} Release: 3%{?dist} Summary: tdengine from taosdata diff --git a/packaging/tools/makeclient.sh b/packaging/tools/makeclient.sh index 7e22116ac5..b4948bb3a7 100755 --- a/packaging/tools/makeclient.sh +++ b/packaging/tools/makeclient.sh @@ -19,8 +19,8 @@ code_dir="${top_dir}/src" release_dir="${top_dir}/release" community_dir="${script_dir}/../../../community/src" -package_name='linux' -install_dir="${release_dir}/taos-client-${version}-${package_name}-$(echo ${build_time}| tr ': ' -)" +#package_name='linux' +install_dir="${release_dir}/TDengine-client-${version}" # Directories and files. bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdump ${script_dir}/remove_client.sh" diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index 24f3a0b8d1..714b74dbe6 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -16,8 +16,8 @@ build_dir="${compile_dir}/build" code_dir="${top_dir}/src" release_dir="${top_dir}/release" -package_name='linux' -install_dir="${release_dir}/taos-${version}-${package_name}-$(echo ${build_time}| tr ': ' -)" +#package_name='linux' +install_dir="${release_dir}/TDengine-${version}" # Directories and files. bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdemo ${build_dir}/bin/taosdump ${script_dir}/remove.sh" diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 2eb9893556..ecda6912c3 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -29,9 +29,19 @@ #elif defined(DARWIN) char CLIENT_VERSION[] = "Welcome to the TDengine shell from mac, client version:%s "; #else - char CLIENT_VERSION[] = "Welcome to the TDengine shell from linux, client version:%s "; + #ifdef CLUSTER + char CLIENT_VERSION[] = "Welcome to the TDengine shell from linux, enterprise client version:%s "; + #else + char CLIENT_VERSION[] = "Welcome to the TDengine shell from linux, community client version:%s "; + #endif #endif -char SERVER_VERSION[] = "server version:%s\nCopyright (c) 2017 by TAOS Data, Inc. All rights reserved.\n\n"; + +#ifdef CLUSTER + char SERVER_VERSION[] = "enterprise server version:%s\nCopyright (c) 2017 by TAOS Data, Inc. All rights reserved.\n\n"; +#else + char SERVER_VERSION[] = "community server version:%s\nCopyright (c) 2017 by TAOS Data, Inc. All rights reserved.\n\n"; +#endif + char PROMPT_HEADER[] = "taos> "; char CONTINUE_PROMPT[] = " -> "; int prompt_size = 6; diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index 0ece4efbb2..ad8bf6c5c3 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -105,6 +105,15 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { static struct argp argp = {options, parse_opt, args_doc, doc}; void shellParseArgument(int argc, char *argv[], struct arguments *arguments) { + char verType[32] = {0}; + #ifdef CLUSTER + sprintf(verType, "enterprise version: %s\n", version); + #else + sprintf(verType, "community version: %s\n", version); + #endif + + argp_program_version = verType; + argp_parse(&argp, argc, argv, 0, 0, arguments); if (arguments->abort) { error(10, 0, "ABORTED"); diff --git a/src/system/detail/src/dnodeService.c b/src/system/detail/src/dnodeService.c index 9764afc593..f03bd5f3bb 100644 --- a/src/system/detail/src/dnodeService.c +++ b/src/system/detail/src/dnodeService.c @@ -55,7 +55,12 @@ int main(int argc, char *argv[]) { exit(EXIT_FAILURE); } } else if (strcmp(argv[i], "-V") == 0) { - printf("version: %s compatible_version: %s\n", version, compatible_version); + #ifdef CLUSTER + printf("enterprise version: %s compatible_version: %s\n", version, compatible_version); + #else + printf("community version: %s compatible_version: %s\n", version, compatible_version); + #endif + printf("gitinfo: %s\n", gitinfo); printf("buildinfo: %s\n", buildinfo); return 0; From 2002c8e20b0774be018e4d58d7f0e92a1c2919ac Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 2 Dec 2019 20:34:32 +0800 Subject: [PATCH 15/55] rror may occur when reading http chunked body --- src/modules/http/src/httpHandle.c | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/modules/http/src/httpHandle.c b/src/modules/http/src/httpHandle.c index 16e8378fb8..c736825b37 100644 --- a/src/modules/http/src/httpHandle.c +++ b/src/modules/http/src/httpHandle.c @@ -293,19 +293,14 @@ bool httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) { int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) { int dataReadLen = pParser->bufsize - (int)(pParser->data.pos - pParser->buffer); if (dataReadLen > pParser->data.len) { - httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, dataReadLen:%d > pContext->data.len:%d", - pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len); + httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, read size:%d dataReadLen:%d > pContext->data.len:%d", + pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len); httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR); return HTTP_CHECK_BODY_ERROR; } else if (dataReadLen < pParser->data.len) { - httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, dataReadLen:%d < pContext->data.len:%d, continue read", - pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len); - if (!httpReadDataImp(pContext)) { - httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr); - return HTTP_CHECK_BODY_ERROR; - } else { - return HTTP_CHECK_BODY_CONTINUE; - } + httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read", + pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len); + return HTTP_CHECK_BODY_CONTINUE; } else { return HTTP_CHECK_BODY_SUCCESS; } From 5d25f776a4bcb49b432e9809301eb623ec98cc2f Mon Sep 17 00:00:00 2001 From: fang Date: Mon, 2 Dec 2019 22:02:11 +0800 Subject: [PATCH 16/55] python connector user guide --- .../tdenginedocs-cn/connector/index.html | 90 +++++++++++++++---- .../tdenginedocs-en/connector/index.html | 66 +++++++++++++- 2 files changed, 138 insertions(+), 18 deletions(-) diff --git a/documentation/tdenginedocs-cn/connector/index.html b/documentation/tdenginedocs-cn/connector/index.html index fca343e977..d808e42dda 100644 --- a/documentation/tdenginedocs-cn/connector/index.html +++ b/documentation/tdenginedocs-cn/connector/index.html @@ -114,23 +114,83 @@ public Connection getConn() throws Exception{

对于TDengine操作的报错信息,用户可使用JDBCDriver包里提供的枚举类TSDBError.java来获取error message和error code的列表。对于更多的具体操作的相关代码,请参考TDengine提供的使用示范项目JDBCDemo

Python Connector

-

Python客户端安装

-

用户可以在源代码的src/connector/python文件夹下找到python2和python3的安装包。用户可以通过pip命令安装:

-

pip install src/connector/python/python2/

-

-

pip install src/connector/python/python3/

-

如果机器上没有pip命令,用户可将src/connector/python/python3或src/connector/python/python2下的taos文件夹拷贝到应用程序的目录使用。

-

Python客户端接口

+

安装准备

+
  • 已安装TDengine, 如果客户端在Windows上,需要安装Windows 版本的TDengine客户端
  • +
  • 已安装python 2.7 or >= 3.4
  • +
  • 已安装pip
  • +

    安装

    +

    Linux

    +

    用户可以在源代码的src/connector/python文件夹下找到python2和python3的安装包, 然后通过pip命令安装

    +
    pip install src/connector/python/linux/python2/
    +

    或者

    +
    pip install src/connector/python/linux/python3/
    +

    Windows

    +

    在已安装Windows 客户端的情况下, 将文件"C:\TDengine\driver\taos.dll" 拷贝到 "C:\windows\system32" 目录下, 然后进入Windwos cmd 命令行界面

    +
    cd C:\TDengine\connector\python\windows
    +
    pip install python2\
    +

    或者

    +
    cd C:\TDengine\connector\python\windows
    +
    pip install python3\
    +

    * 如果机器上没有pip命令,用户可将src/connector/python/windows/python3或src/connector/python/windows/python2下的taos文件夹拷贝到应用程序的目录使用。

    +

    Python client interfaces

    在使用TDengine的python接口时,需导入TDengine客户端模块:

    -
    import taos 
    -

    用户可通过python的帮助信息直接查看模块的使用信息,或者参考code/examples/python中的示例程序。以下为部分常用类和方法:

    +
    import taos 
    +

    代码示例

    +
  • 获取连接
  • +
    
    +conn = taos.connect(host="127.0.0.1", user="root", password="taosdata", config="/etc/taos")
    +c1 = conn.cursor()
    +
    +
  • 写入数据
  • +
    
    +import datetime
    + 
    +# create a database
    +c1.execute('create database db')
    +c1.execute('use db')
    +# create a table
    +c1.execute('create table tb (ts timestamp, temperature int, humidity float)')
    +# insert a record
    +start_time = datetime.datetime(2019, 11, 1)
    +affected_rows = c1.execute('insert into tb values (\'%s\', 0, 0.0)' %start_time)
    +# insert multiple records in a batch
    +time_interval = datetime.timedelta(seconds=60)
    +sqlcmd = ['insert into tb values']
    +for irow in range(1,11):
    +  start_time += time_interval
    +  sqlcmd.append('(\'%s\', %d, %f)' %(start_time, irow, irow*1.2))
    +affected_rows = c1.execute(' '.join(sqlcmd))
    +
    +
  • 写入数据
  • +
    +c1.execute('select * from tb')
    +# fetch all returned results
    +data = c1.fetchall()
    +# data is a list of returned rows with each row being a tuple
    +numOfRows = c1.rowcount
    +numOfCols = c1.descriptions
    +for irow in range(numOfRows):
    +  print("Row%d: ts=%s, temperature=%d, humidity=%f" %(irow, data[irow][0], data[irow][1],data[irow][2])
    +  
    +# use the cursor as an iterator to retrieve all returned results
    +c1.execute('select * from tb')
    +for data in c1:
    +  print("ts=%s, temperature=%d, humidity=%f" %(data[0], data[1],data[2])
    +
    +
  • 关闭连接
  • +
    +c1.close()
    +conn.close()
    +
    +

    帮助信息

    +

    用户可通过python的帮助信息直接查看模块的使用信息,或者参考code/examples/python中的示例程序。以下为部分常用类和方法:

      -
    • TaosConnection

      -

      参考python中help(taos.TaosConnection)。

    • -
    • TaosCursor

      -

      参考python中help(taos.TaosCursor)。

    • -
    • connect方法

      -

      用于生成taos.TaosConnection的实例。

    • +
    • TaosConnection

      +

      参考python中help(taos.TDengineConnection)

    • +
    • TaosCursor

      +

      参考python中help(taos.TDengineCursor)

    • +
    • connect 方法

      +

      用于生成taos.TDengineConnection的实例。

    RESTful Connector

    为支持各种不同类型平台的开发,TDengine提供符合REST设计标准的API,即RESTful API。为最大程度降低学习成本,不同于其他数据库RESTful API的设计方法,TDengine直接通过HTTP POST 请求BODY中包含的SQL语句来操作数据库,仅需要一个URL。

    diff --git a/documentation/tdenginedocs-en/connector/index.html b/documentation/tdenginedocs-en/connector/index.html index ce32c062ff..ba1cbcaf85 100644 --- a/documentation/tdenginedocs-en/connector/index.html +++ b/documentation/tdenginedocs-en/connector/index.html @@ -122,15 +122,75 @@ public Connection getConn() throws Exception{

    All the error codes and error messages can be found in TSDBError.java . For a more detailed coding example, please refer to the demo project JDBCDemo in TDengine's code examples.

    Python Connector

    +

    Pre-requirement

    +
  • TDengine installed, TDengine-client installed if on Windows
  • +
  • python 2.7 or >= 3.4
  • +
  • pip installed
  • Install TDengine Python client

    +

    Linux

    Users can find python client packages in our source code directory src/connector/python. There are two directories corresponding two python versions. Please choose the correct package to install. Users can use pip command to install:

    -
    pip install src/connector/python/python2/
    +
    pip install src/connector/python/linux/python2/

    or

    -
    pip install src/connector/python/python3/
    -

    If pip command is not installed on the system, users can choose to install pip or just copy the taos directory in the python client directory to the application directory to use.

    +
    pip install src/connector/python/linux/python3/
    +

    Windows

    +

    Assume you have installed the Windows client , copy the file "C:\TDengine\driver\taos.dll" to the folder "C:\windows\system32", and then enter the cmd command interface

    +
    cd C:\TDengine\connector\python\windows
    +
    pip install python2\
    +

    or

    +
    cd C:\TDengine\connector\python\windows
    +
    pip install python3\
    +

    * If pip command is not installed on the system, users can choose to install pip or just copy the taos directory in the python client directory to the application directory to use.

    Python client interfaces

    To use TDengine Python client, import TDengine module at first:

    import taos 
    +

    Examples

    +
  • get the connection
  • +
    
    +conn = taos.connect(host="127.0.0.1", user="root", password="taosdata", config="/etc/taos")
    +c1 = conn.cursor()
    +
    +
  • insert records into the database
  • +
    
    +import datetime
    + 
    +# create a database
    +c1.execute('create database db')
    +c1.execute('use db')
    +# create a table
    +c1.execute('create table tb (ts timestamp, temperature int, humidity float)')
    +# insert a record
    +start_time = datetime.datetime(2019, 11, 1)
    +affected_rows = c1.execute('insert into tb values (\'%s\', 0, 0.0)' %start_time)
    +# insert multiple records in a batch
    +time_interval = datetime.timedelta(seconds=60)
    +sqlcmd = ['insert into tb values']
    +for irow in range(1,11):
    +  start_time += time_interval
    +  sqlcmd.append('(\'%s\', %d, %f)' %(start_time, irow, irow*1.2))
    +affected_rows = c1.execute(' '.join(sqlcmd))
    +
    +
  • query the database
  • +
    +c1.execute('select * from tb')
    +# fetch all returned results
    +data = c1.fetchall()
    +# data is a list of returned rows with each row being a tuple
    +numOfRows = c1.rowcount
    +numOfCols = c1.descriptions
    +for irow in range(numOfRows):
    +  print("Row%d: ts=%s, temperature=%d, humidity=%f" %(irow, data[irow][0], data[irow][1],data[irow][2])
    +  
    +# use the cursor as an iterator to retrieve all returned results
    +c1.execute('select * from tb')
    +for data in c1:
    +  print("ts=%s, temperature=%d, humidity=%f" %(data[0], data[1],data[2])
    +
    +
  • close the connection
  • +
    +c1.close()
    +conn.close()
    +
    +

    Help information

    Users can get module information from Python help interface or refer to our [python code example](). We list the main classes and methods below: