From 6e14f9dc8ba8e3a3ef6839f2123973dc33ad38c8 Mon Sep 17 00:00:00 2001 From: slguan Date: Sat, 9 Nov 2019 14:37:44 +0800 Subject: [PATCH 01/16] Client and server versions may not match --- src/client/src/tscSql.c | 43 +++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index a2d4abbca3..0e1fead70e 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -138,34 +138,35 @@ TAOS *taos_connect(char *ip, char *user, char *pass, char *db, int port) { if (taos != NULL) { STscObj* pObj = (STscObj*) taos; - int clientVersionNumber[4] = {0}; - if (!taosGetVersionNumber(version, clientVersionNumber)) { - tscError("taos:%p, invalid client version:%s", taos, version); - //pObj->pSql->res.code = TSDB_CODE_INVALID_CLIENT_VERSION; - globalCode = TSDB_CODE_INVALID_CLIENT_VERSION; - taos_close(taos); - return NULL; - } - - char *server_version = taos_get_server_info(taos); - int serverVersionNumber[4] = {0}; - if (!taosGetVersionNumber(server_version, serverVersionNumber)) { - tscError("taos:%p, invalid server version:%s", taos, server_version); - //pObj->pSql->res.code = TSDB_CODE_INVALID_CLIENT_VERSION; - globalCode = TSDB_CODE_INVALID_CLIENT_VERSION; - taos_close(taos); - return NULL; - } - // version compare only requires the first 3 segments of the version string int32_t comparedSegments = 3; + char client_version[64] = {0}; + char server_version[64] = {0}; + int clientVersionNumber[4] = {0}; + int serverVersionNumber[4] = {0}; + + strcpy(client_version, version); + strcpy(server_version, taos_get_server_info(taos)); + + if (!taosGetVersionNumber(client_version, clientVersionNumber)) { + tscError("taos:%p, invalid client version:%s", taos, client_version); + pObj->pSql->res.code = TSDB_CODE_INVALID_CLIENT_VERSION; + taos_close(taos); + return NULL; + } + + if (!taosGetVersionNumber(server_version, serverVersionNumber)) { + tscError("taos:%p, invalid server version:%s", taos, server_version); + pObj->pSql->res.code = TSDB_CODE_INVALID_CLIENT_VERSION; + taos_close(taos); + return NULL; + } for(int32_t i = 0; i < comparedSegments; ++i) { if (clientVersionNumber[i] != serverVersionNumber[i]) { tscError("taos:%p, the %d-th number of server version:%s not matched with client version:%s, close connection", taos, i, server_version, version); - //pObj->pSql->res.code = TSDB_CODE_INVALID_CLIENT_VERSION; - globalCode = TSDB_CODE_INVALID_CLIENT_VERSION; + pObj->pSql->res.code = TSDB_CODE_INVALID_CLIENT_VERSION; taos_close(taos); return NULL; } From 0d2eb1bf0848bca00377a8ac5515f632c622dfa5 Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 15 Nov 2019 18:44:00 +0800 Subject: [PATCH 02/16] [TBASE-1155] --- src/os/darwin/src/tdarwin.c | 4 ---- src/os/linux/src/tsystem.c | 19 ------------------- src/os/windows/src/twindows.c | 4 ---- src/system/src/vnodeImport.c | 3 ++- src/util/src/tutil.c | 19 +++++++++++++++++++ 5 files changed, 21 insertions(+), 28 deletions(-) diff --git a/src/os/darwin/src/tdarwin.c b/src/os/darwin/src/tdarwin.c index 2f97c7c376..71e8a56466 100644 --- a/src/os/darwin/src/tdarwin.c +++ b/src/os/darwin/src/tdarwin.c @@ -243,10 +243,6 @@ int taosInitTimer(void (*callback)(int), int ms) { return setitimer(ITIMER_REAL, &tv, NULL); } -char *taosCharsetReplace(char *charsetstr) { - return charsetstr; -} - void taosGetSystemTimezone() { // get and set default timezone SGlobalConfig *cfg_timezone = tsGetConfigOption("timezone"); diff --git a/src/os/linux/src/tsystem.c b/src/os/linux/src/tsystem.c index 0989f007b3..e1f771759a 100644 --- a/src/os/linux/src/tsystem.c +++ b/src/os/linux/src/tsystem.c @@ -210,25 +210,6 @@ void taosGetSystemTimezone() { pPrint("timezone not configured, set to system default:%s", tsTimezone); } -typedef struct CharsetPair { - char *oldCharset; - char *newCharset; -} CharsetPair; - -char *taosCharsetReplace(char *charsetstr) { - CharsetPair charsetRep[] = { - {"utf8", "UTF-8"}, {"936", "CP936"}, - }; - - for (int32_t i = 0; i < tListLen(charsetRep); ++i) { - if (strcasecmp(charsetRep[i].oldCharset, charsetstr) == 0) { - return strdup(charsetRep[i].newCharset); - } - } - - return strdup(charsetstr); -} - /* * POSIX format locale string: * (Language Strings)_(Country/Region Strings).(code_page) diff --git a/src/os/windows/src/twindows.c b/src/os/windows/src/twindows.c index 75bc8c2839..53563341c4 100644 --- a/src/os/windows/src/twindows.c +++ b/src/os/windows/src/twindows.c @@ -96,10 +96,6 @@ void __sync_val_restore_32(int32_t *ptr, int32_t newval) { void tsPrintOsInfo() {} -char *taosCharsetReplace(char *charsetstr) { - return charsetstr; -} - void taosGetSystemTimezone() { // get and set default timezone SGlobalConfig *cfg_timezone = tsGetConfigOption("timezone"); diff --git a/src/system/src/vnodeImport.c b/src/system/src/vnodeImport.c index 9691206bf9..bfffc66aa5 100644 --- a/src/system/src/vnodeImport.c +++ b/src/system/src/vnodeImport.c @@ -546,6 +546,7 @@ int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) { return code; } + assert(rows); dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to cache, firstKey:%ld lastKey:%ld", pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey); @@ -781,7 +782,7 @@ int vnodeImportStartToCache(SImportInfo *pImport, char *payload, int rows) { pImport->importedRows = rows; code = vnodeImportToCache(pImport, payload, rows); } else { - dTrace("vid:%d sid:%d id:%s, data is already imported to cache", pObj->vnode, pObj->sid, pObj->meterId); + dTrace("vid:%d sid:%d id:%s, data is already imported to cache, firstKey:%lld", pObj->vnode, pObj->sid, pObj->meterId, pImport->firstKey); } return code; diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index cb93319900..0b407a848b 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -508,3 +508,22 @@ char *taosIpStr(int ipInt) { sprintf(ipStr, "0x%x:%d.%d.%d.%d", ipInt, ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, ipInt >> 24); return ipStr; } + +typedef struct CharsetPair { + char *oldCharset; + char *newCharset; +} CharsetPair; + +char *taosCharsetReplace(char *charsetstr) { + CharsetPair charsetRep[] = { + { "utf8", "UTF-8" }, { "936", "CP936" }, + }; + + for (int32_t i = 0; i < tListLen(charsetRep); ++i) { + if (strcasecmp(charsetRep[i].oldCharset, charsetstr) == 0) { + return strdup(charsetRep[i].newCharset); + } + } + + return strdup(charsetstr); +} \ No newline at end of file From faca8084e11dd83c07ba4c18dced20c54175f5a4 Mon Sep 17 00:00:00 2001 From: slguan Date: Sun, 1 Dec 2019 13:24:12 +0800 Subject: [PATCH 03/16] resolve conflicts --- src/os/windows/src/twindows.c | 3 - src/system/src/vnodeImport.c | 985 ---------------------------------- src/util/src/tutil.c | 22 - 3 files changed, 1010 deletions(-) delete mode 100644 src/system/src/vnodeImport.c diff --git a/src/os/windows/src/twindows.c b/src/os/windows/src/twindows.c index 49c2466669..98be6b60ba 100644 --- a/src/os/windows/src/twindows.c +++ b/src/os/windows/src/twindows.c @@ -132,8 +132,6 @@ short interlocked_or_fetch_16(short volatile* ptr, short val) { return _InterlockedOr16(ptr, val) | val; } -<<<<<<< HEAD -======= long interlocked_or_fetch_32(long volatile* ptr, long val) { return _InterlockedOr(ptr, val) | val; } @@ -207,7 +205,6 @@ __int64 interlocked_fetch_xor_64(__int64 volatile* ptr, __int64 val) { void tsPrintOsInfo() {} ->>>>>>> release/v1.6.4.0 void taosGetSystemTimezone() { // get and set default timezone SGlobalConfig *cfg_timezone = tsGetConfigOption("timezone"); diff --git a/src/system/src/vnodeImport.c b/src/system/src/vnodeImport.c deleted file mode 100644 index bfffc66aa5..0000000000 --- a/src/system/src/vnodeImport.c +++ /dev/null @@ -1,985 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include -#include -#include -#include - -#include "trpc.h" -#include "ttimer.h" -#include "vnode.h" -#include "vnodeMgmt.h" -#include "vnodeShell.h" -#include "vnodeShell.h" -#include "vnodeUtil.h" -#pragma GCC diagnostic ignored "-Wpointer-sign" -#pragma GCC diagnostic ignored "-Wint-conversion" - -typedef struct { - SCompHeader *headList; - SCompInfo compInfo; - int last; // 0:last block in data file, 1:not the last block - int newBlocks; - int oldNumOfBlocks; - int64_t compInfoOffset; // offset for compInfo in head file - int64_t leftOffset; // copy from this offset to end of head file - int64_t hfdSize; // old head file size -} SHeadInfo; - -typedef struct { - void *signature; - SShellObj *pShell; - SMeterObj *pObj; - int retry; - TSKEY firstKey; - TSKEY lastKey; - int importedRows; - int commit; // start to commit if it is set to 1 - - int slot; // slot/block to start writing the import data - int pos; // pos to start writing the import data in the slot/block - TSKEY key; - - // only for file - int numOfPoints; - int fileId; - int64_t offset; // offset in data file - SData *sdata[TSDB_MAX_COLUMNS]; - char *buffer; - char *payload; - char *opayload; // allocated space for payload from client - int rows; -} SImportInfo; - -int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport); - -int vnodeGetImportStartPart(SMeterObj *pObj, char *payload, int rows, TSKEY key1) { - int i; - - for (i = 0; i < rows; ++i) { - TSKEY key = *((TSKEY *)(payload + i * pObj->bytesPerPoint)); - if (key >= key1) break; - } - - return i; -} - -int vnodeGetImportEndPart(SMeterObj *pObj, char *payload, int rows, char **pStart, TSKEY key0) { - int i; - - for (i = 0; i < rows; ++i) { - TSKEY key = *((TSKEY *)(payload + i * pObj->bytesPerPoint)); - if (key > key0) break; - } - - *pStart = payload + i * pObj->bytesPerPoint; - return rows - i; -} - -int vnodeCloseFileForImport(SMeterObj *pObj, SHeadInfo *pHinfo) { - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg *pCfg = &pVnode->cfg; - TSCKSUM chksum = 0; - - if (pHinfo->newBlocks == 0 || pHinfo->compInfoOffset == 0) return 0; - - if (pHinfo->oldNumOfBlocks == 0) twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM)); - - int leftSize = pHinfo->hfdSize - pHinfo->leftOffset; - if (leftSize > 0) { - lseek(pVnode->hfd, pHinfo->leftOffset, SEEK_SET); - tsendfile(pVnode->nfd, pVnode->hfd, NULL, leftSize); - } - - pHinfo->compInfo.numOfBlocks += pHinfo->newBlocks; - int offset = (pHinfo->compInfo.numOfBlocks - pHinfo->oldNumOfBlocks) * sizeof(SCompBlock); - if (pHinfo->oldNumOfBlocks == 0) offset += sizeof(SCompInfo) + sizeof(TSCKSUM); - - pHinfo->headList[pObj->sid].compInfoOffset = pHinfo->compInfoOffset; - for (int sid = pObj->sid + 1; sid < pCfg->maxSessions; ++sid) { - if (pHinfo->headList[sid].compInfoOffset) pHinfo->headList[sid].compInfoOffset += offset; - } - - lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET); - int tmsize = sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)pHinfo->headList, tmsize); - twrite(pVnode->nfd, pHinfo->headList, tmsize); - - int size = pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock); - char *buffer = malloc(size); - lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET); - read(pVnode->nfd, buffer, size); - SCompBlock *pBlock = (SCompBlock *)(buffer + (pHinfo->compInfo.numOfBlocks - 1) * sizeof(SCompBlock)); - - pHinfo->compInfo.uid = pObj->uid; - pHinfo->compInfo.delimiter = TSDB_VNODE_DELIMITER; - pHinfo->compInfo.last = pBlock->last; - - taosCalcChecksumAppend(0, (uint8_t *)(&pHinfo->compInfo), sizeof(SCompInfo)); - lseek(pVnode->nfd, pHinfo->compInfoOffset, SEEK_SET); - twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo)); - - chksum = taosCalcChecksum(0, (uint8_t *)buffer, size); - lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo) + size, SEEK_SET); - twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM)); - free(buffer); - - vnodeCloseCommitFiles(pVnode); - - return 0; -} - -int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[]) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SCompBlock lastBlock; - int code = 0; - - if (pHinfo->compInfo.last == 0) return 0; - - // read into memory - uint64_t offset = - pHinfo->compInfoOffset + (pHinfo->compInfo.numOfBlocks - 1) * sizeof(SCompBlock) + sizeof(SCompInfo); - lseek(pVnode->hfd, offset, SEEK_SET); - read(pVnode->hfd, &lastBlock, sizeof(SCompBlock)); - assert(lastBlock.last); - - if (lastBlock.sversion != pObj->sversion) { - lseek(pVnode->lfd, lastBlock.offset, SEEK_SET); - lastBlock.offset = lseek(pVnode->dfd, 0, SEEK_END); - tsendfile(pVnode->dfd, pVnode->lfd, NULL, lastBlock.len); - - lastBlock.last = 0; - lseek(pVnode->hfd, offset, SEEK_SET); - twrite(pVnode->hfd, &lastBlock, sizeof(SCompBlock)); - } else { - vnodeReadLastBlockToMem(pObj, &lastBlock, data); - pHinfo->compInfo.numOfBlocks--; - code = lastBlock.numOfPoints; - } - - return code; -} - -int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinfo, SData *data[]) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg *pCfg = &pVnode->cfg; - TSKEY firstKey = *((TSKEY *)payload); - struct stat filestat; - int sid, rowsBefore = 0; - - if (pVnode->nfd <= 0 || firstKey > pVnode->commitLastKey) { - if (pVnode->nfd > 0) vnodeCloseFileForImport(pObj, pHinfo); - - pVnode->commitFirstKey = firstKey; - if (vnodeOpenCommitFiles(pVnode, pObj->sid) < 0) return -1; - - fstat(pVnode->hfd, &filestat); - pHinfo->hfdSize = filestat.st_size; - pHinfo->newBlocks = 0; - pHinfo->last = 1; // by default, new blockes are at the end of block list - - lseek(pVnode->hfd, TSDB_FILE_HEADER_LEN, SEEK_SET); - read(pVnode->hfd, pHinfo->headList, sizeof(SCompHeader) * pCfg->maxSessions); - - if (pHinfo->headList[pObj->sid].compInfoOffset > 0) { - lseek(pVnode->hfd, pHinfo->headList[pObj->sid].compInfoOffset, SEEK_SET); - if (read(pVnode->hfd, &pHinfo->compInfo, sizeof(SCompInfo)) != sizeof(SCompInfo)) { - dError("vid:%d sid:%d, failed to read compInfo from file:%s", pObj->vnode, pObj->sid, pVnode->cfn); - return -1; - } - - if (pHinfo->compInfo.uid == pObj->uid) { - pHinfo->compInfoOffset = pHinfo->headList[pObj->sid].compInfoOffset; - pHinfo->leftOffset = pHinfo->headList[pObj->sid].compInfoOffset + sizeof(SCompInfo); - } else { - pHinfo->headList[pObj->sid].compInfoOffset = 0; - } - } - - if ( pHinfo->headList[pObj->sid].compInfoOffset == 0 ) { - memset(&pHinfo->compInfo, 0, sizeof(SCompInfo)); - pHinfo->compInfo.uid = pObj->uid; - - for (sid = pObj->sid + 1; sid < pCfg->maxSessions; ++sid) - if (pHinfo->headList[sid].compInfoOffset > 0) break; - - pHinfo->compInfoOffset = (sid == pCfg->maxSessions) ? pHinfo->hfdSize : pHinfo->headList[sid].compInfoOffset; - pHinfo->leftOffset = pHinfo->compInfoOffset; - } - - pHinfo->oldNumOfBlocks = pHinfo->compInfo.numOfBlocks; - lseek(pVnode->hfd, 0, SEEK_SET); - lseek(pVnode->nfd, 0, SEEK_SET); - tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfoOffset); - twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo)); - if (pHinfo->headList[pObj->sid].compInfoOffset > 0) lseek(pVnode->hfd, sizeof(SCompInfo), SEEK_CUR); - - if (pVnode->commitFileId < pImport->fileId) { - if (pHinfo->compInfo.numOfBlocks > 0) - pHinfo->leftOffset += pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock); - - rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data); - - // copy all existing compBlockInfo - lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET); - if (pHinfo->compInfo.numOfBlocks > 0) - tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock)); - - } else if (pVnode->commitFileId == pImport->fileId) { - int slots = pImport->pos ? pImport->slot + 1 : pImport->slot; - assert(slots >= 0); - pHinfo->leftOffset += slots * sizeof(SCompBlock); - - // check if last block is at last file, if it is, read into memory - if (pImport->pos == 0 && pHinfo->compInfo.numOfBlocks > 0 && pImport->slot == pHinfo->compInfo.numOfBlocks && - pHinfo->compInfo.last) { - rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data); - if ( rowsBefore > 0 ) pImport->slot--; - } - - // this block will be replaced by new blocks - if (pImport->pos > 0) pHinfo->compInfo.numOfBlocks--; - - if (pImport->slot > 0) { - lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET); - tsendfile(pVnode->nfd, pVnode->hfd, NULL, pImport->slot * sizeof(SCompBlock)); - } - - if (pImport->slot < pHinfo->compInfo.numOfBlocks) - pHinfo->last = 0; // new blocks are not at the end of block list - - } else { - // nothing - - pHinfo->last = 0; // new blocks are not at the end of block list - } - } - - return rowsBefore; -} - -extern int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints); -int vnodeImportToFile(SImportInfo *pImport); - -void vnodeProcessImportTimer(void *param, void *tmrId) { - SImportInfo *pImport = (SImportInfo *)param; - if (pImport == NULL || pImport->signature != param) { - dError("import timer is messed up, signature:%p", pImport); - return; - } - - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - SShellObj *pShell = pImport->pShell; - - pImport->retry++; - - //slow query will block the import operation - int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_IMPORTING); - if (state >= TSDB_METER_STATE_DELETING) { - dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d", - pObj->vnode, pObj->sid, pObj->meterId, state); - return; - } - - int32_t num = 0; - pthread_mutex_lock(&pVnode->vmutex); - num = pObj->numOfQueries; - pthread_mutex_unlock(&pVnode->vmutex); - - //if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY - int32_t commitInProcess = 0; - pthread_mutex_lock(&pPool->vmutex); - if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || state != TSDB_METER_STATE_READY) { - pthread_mutex_unlock(&pPool->vmutex); - vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - - if (pImport->retry < 1000) { - dTrace("vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready." - "commitInProcess:%d, numOfQueries:%d, state:%d", pObj->vnode, pObj->sid, pObj->meterId, - commitInProcess, num, state); - - taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl); - return; - } else { - pShell->code = TSDB_CODE_TOO_SLOW; - } - } else { - pPool->commitInProcess = 1; - pthread_mutex_unlock(&pPool->vmutex); - int code = vnodeImportData(pObj, pImport); - if (pShell) { - pShell->code = code; - pShell->numOfTotalPoints += pImport->importedRows; - } - } - - vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - - pVnode->version++; - - // send response back to shell - if (pShell) { - pShell->count--; - if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pImport->pShell, pShell->code, pShell->numOfTotalPoints); - } - - pImport->signature = NULL; - free(pImport->opayload); - free(pImport); -} - -int vnodeImportToFile(SImportInfo *pImport) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg *pCfg = &pVnode->cfg; - SHeadInfo headInfo; - int code = 0, col; - SCompBlock compBlock; - char *payload = pImport->payload; - int rows = pImport->rows; - SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - - TSKEY lastKey = *((TSKEY *)(payload + pObj->bytesPerPoint * (rows - 1))); - TSKEY firstKey = *((TSKEY *)payload); - memset(&headInfo, 0, sizeof(headInfo)); - headInfo.headList = malloc(sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM)); - - SData *cdata[TSDB_MAX_COLUMNS]; - char * buffer1 = - malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns); - cdata[0] = (SData *)buffer1; - - SData *data[TSDB_MAX_COLUMNS]; - char * buffer2 = - malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns); - data[0] = (SData *)buffer2; - - for (col = 1; col < pObj->numOfColumns; ++col) { - cdata[col] = (SData *)(((char *)cdata[col - 1]) + sizeof(SData) + EXTRA_BYTES + - pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes); - data[col] = (SData *)(((char *)data[col - 1]) + sizeof(SData) + EXTRA_BYTES + - pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes); - } - - int rowsBefore = 0; - int rowsRead = 0; - int rowsUnread = 0; - int leftRows = rows; // left number of rows of imported data - int row, rowsToWrite; - int64_t offset[TSDB_MAX_COLUMNS]; - - if (pImport->pos > 0) { - for (col = 0; col < pObj->numOfColumns; ++col) - memcpy(data[col]->data, pImport->sdata[col]->data, pImport->pos * pObj->schema[col].bytes); - - rowsBefore = pImport->pos; - rowsRead = pImport->pos; - rowsUnread = pImport->numOfPoints - pImport->pos; - } - - dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to file, firstKey:%ld lastKey:%ld", - pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey); - do { - if (leftRows > 0) { - code = vnodeOpenFileForImport(pImport, payload, &headInfo, data); - if (code < 0) goto _exit; - if (code > 0) { - rowsBefore = code; - code = 0; - }; - } else { - // if payload is already imported, rows unread shall still be processed - rowsBefore = 0; - } - - int rowsToProcess = pObj->pointsPerFileBlock - rowsBefore; - if (rowsToProcess > leftRows) rowsToProcess = leftRows; - - for (col = 0; col < pObj->numOfColumns; ++col) { - offset[col] = data[col]->data + rowsBefore * pObj->schema[col].bytes; - } - - row = 0; - if (leftRows > 0) { - for (row = 0; row < rowsToProcess; ++row) { - if (*((TSKEY *)payload) > pVnode->commitLastKey) break; - - for (col = 0; col < pObj->numOfColumns; ++col) { - memcpy((void *)offset[col], payload, pObj->schema[col].bytes); - payload += pObj->schema[col].bytes; - offset[col] += pObj->schema[col].bytes; - } - } - } - - leftRows -= row; - rowsToWrite = rowsBefore + row; - rowsBefore = 0; - - if (leftRows == 0 && rowsUnread > 0) { - // copy the unread - int rowsToCopy = pObj->pointsPerFileBlock - rowsToWrite; - if (rowsToCopy > rowsUnread) rowsToCopy = rowsUnread; - - for (col = 0; col < pObj->numOfColumns; ++col) { - int bytes = pObj->schema[col].bytes; - memcpy(data[col]->data + rowsToWrite * bytes, pImport->sdata[col]->data + rowsRead * bytes, rowsToCopy * bytes); - } - - rowsRead += rowsToCopy; - rowsUnread -= rowsToCopy; - rowsToWrite += rowsToCopy; - } - - for (col = 0; col < pObj->numOfColumns; ++col) { - data[col]->len = rowsToWrite * pObj->schema[col].bytes; - } - - compBlock.last = headInfo.last; - vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite); - twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock)); - - rowsToWrite = 0; - headInfo.newBlocks++; - - } while (leftRows > 0 || rowsUnread > 0); - - if (compBlock.keyLast > pObj->lastKeyOnFile) - pObj->lastKeyOnFile = compBlock.keyLast; - - vnodeCloseFileForImport(pObj, &headInfo); - dTrace("vid:%d sid:%d id:%s, %d rows data are imported to file", pObj->vnode, pObj->sid, pObj->meterId, rows); - - SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; - pthread_mutex_lock(&pPool->vmutex); - - if (pInfo->numOfBlocks > 0) { - int slot = (pInfo->currentSlot - pInfo->numOfBlocks + 1 + pInfo->maxBlocks) % pInfo->maxBlocks; - TSKEY firstKeyInCache = *((TSKEY *)(pInfo->cacheBlocks[slot]->offset[0])); - - // data may be in commited cache, cache shall be released - if (lastKey > firstKeyInCache) { - while (slot != pInfo->commitSlot) { - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; - vnodeFreeCacheBlock(pCacheBlock); - slot = (slot + 1 + pInfo->maxBlocks) % pInfo->maxBlocks; - } - - // last slot, the uncommitted slots shall be shifted - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; - int points = pCacheBlock->numOfPoints - pInfo->commitPoint; - if (points > 0) { - for (int col = 0; col < pObj->numOfColumns; ++col) { - int size = points * pObj->schema[col].bytes; - memmove(pCacheBlock->offset[col], pCacheBlock->offset[col] + pObj->schema[col].bytes * pInfo->commitPoint, size); - } - } - - if (pInfo->commitPoint != pObj->pointsPerBlock) { - // commit point shall be set to 0 if last block is not full - pInfo->commitPoint = 0; - pCacheBlock->numOfPoints = points; - if (slot == pInfo->currentSlot) { - __sync_fetch_and_add(&pObj->freePoints, pInfo->commitPoint); - } - } else { - // if last block is full and committed - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; - if (pCacheBlock->pMeterObj == pObj) { - vnodeFreeCacheBlock(pCacheBlock); - } - } - } - } - - if (lastKey > pObj->lastKeyOnFile) pObj->lastKeyOnFile = lastKey; - - pthread_mutex_unlock(&pPool->vmutex); - -_exit: - tfree(headInfo.headList); - tfree(buffer1); - tfree(buffer2); - tfree(pImport->buffer); - - return code; -} - -int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg *pCfg = &pVnode->cfg; - int code = -1; - SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; - int slot, pos, row, col, points, tpoints; - - char *data[TSDB_MAX_COLUMNS], *current[TSDB_MAX_COLUMNS]; - int slots = pInfo->unCommittedBlocks + 1; - int trows = slots * pObj->pointsPerBlock + rows; // max rows in buffer - int tsize = (trows / pObj->pointsPerBlock + 1) * pCfg->cacheBlockSize; - TSKEY firstKey = *((TSKEY *)payload); - TSKEY lastKey = *((TSKEY *)(payload + pObj->bytesPerPoint * (rows - 1))); - - if (pObj->freePoints < rows || pObj->freePoints < (pObj->pointsPerBlock << 1)) { - dError("vid:%d sid:%d id:%s, import failed, cache is full, freePoints:%d", pObj->vnode, pObj->sid, pObj->meterId, - pObj->freePoints); - pImport->importedRows = 0; - pImport->commit = 1; - code = TSDB_CODE_ACTION_IN_PROGRESS; - return code; - } - - assert(rows); - dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to cache, firstKey:%ld lastKey:%ld", - pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey); - - pthread_mutex_lock(&(pVnode->vmutex)); - if (firstKey < pVnode->firstKey) pVnode->firstKey = firstKey; - pthread_mutex_unlock(&(pVnode->vmutex)); - - char *buffer = malloc(tsize); // buffer to hold unCommitted data plus import data - data[0] = buffer; - current[0] = data[0]; - for (col = 1; col < pObj->numOfColumns; ++col) { - data[col] = data[col - 1] + trows * pObj->schema[col - 1].bytes; - current[col] = data[col]; - } - - // write import data into buffer first - for (row = 0; row < rows; ++row) { - for (col = 0; col < pObj->numOfColumns; ++col) { - memcpy(current[col], payload, pObj->schema[col].bytes); - payload += pObj->schema[col].bytes; - current[col] += pObj->schema[col].bytes; - } - } - - // copy the overwritten data into buffer - tpoints = rows; - pos = pImport->pos; - slot = pImport->slot; - while (1) { - points = pInfo->cacheBlocks[slot]->numOfPoints - pos; - for (col = 0; col < pObj->numOfColumns; ++col) { - int size = points * pObj->schema[col].bytes; - memcpy(current[col], pInfo->cacheBlocks[slot]->offset[col] + pos * pObj->schema[col].bytes, size); - current[col] += size; - } - pos = 0; - tpoints += points; - - if (slot == pInfo->currentSlot) break; - slot = (slot + 1) % pInfo->maxBlocks; - } - - for (col = 0; col < pObj->numOfColumns; ++col) current[col] = data[col]; - pos = pImport->pos; - - // write back to existing slots first - slot = pImport->slot; - while (1) { - points = (tpoints > pObj->pointsPerBlock - pos) ? pObj->pointsPerBlock - pos : tpoints; - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; - for (col = 0; col < pObj->numOfColumns; ++col) { - int size = points * pObj->schema[col].bytes; - memcpy(pCacheBlock->offset[col] + pos * pObj->schema[col].bytes, current[col], size); - current[col] += size; - } - pCacheBlock->numOfPoints = points + pos; - pos = 0; - tpoints -= points; - - if (slot == pInfo->currentSlot) break; - slot = (slot + 1) % pInfo->maxBlocks; - } - - // allocate new cache block if there are still data left - while (tpoints > 0) { - pImport->commit = vnodeAllocateCacheBlock(pObj); - if (pImport->commit < 0) goto _exit; - points = (tpoints > pObj->pointsPerBlock) ? pObj->pointsPerBlock : tpoints; - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[pInfo->currentSlot]; - for (col = 0; col < pObj->numOfColumns; ++col) { - int size = points * pObj->schema[col].bytes; - memcpy(pCacheBlock->offset[col] + pos * pObj->schema[col].bytes, current[col], size); - current[col] += size; - } - tpoints -= points; - pCacheBlock->numOfPoints = points; - } - - code = 0; - __sync_fetch_and_sub(&pObj->freePoints, rows); - dTrace("vid:%d sid:%d id:%s, %d rows data are imported to cache", pObj->vnode, pObj->sid, pObj->meterId, rows); - -_exit: - free(buffer); - return code; -} - -int vnodeFindKeyInFile(SImportInfo *pImport, int order) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - int code = -1; - SQuery query; - SColumnFilter colList[TSDB_MAX_COLUMNS] = {0}; - - TSKEY key = order ? pImport->firstKey : pImport->lastKey; - memset(&query, 0, sizeof(query)); - query.order.order = order; - query.skey = key; - query.ekey = order ? INT64_MAX : 0; - query.colList = colList; - query.numOfCols = pObj->numOfColumns; - - for (int16_t i = 0; i < pObj->numOfColumns; ++i) { - colList[i].data.colId = pObj->schema[i].colId; - colList[i].data.bytes = pObj->schema[i].bytes; - colList[i].data.type = pObj->schema[i].type; - - colList[i].colIdx = i; - colList[i].colIdxInBuf = i; - } - - int ret = vnodeSearchPointInFile(pObj, &query); - - if (ret >= 0) { - if (query.slot < 0) { - pImport->slot = 0; - pImport->pos = 0; - pImport->key = 0; - pImport->fileId = pVnode->fileId - pVnode->numOfFiles + 1; - dTrace("vid:%d sid:%d id:%s, import to head of file", pObj->vnode, pObj->sid, pObj->meterId); - code = 0; - } else if (query.slot >= 0) { - code = 0; - pImport->slot = query.slot; - pImport->pos = query.pos; - pImport->key = query.key; - pImport->fileId = query.fileId; - SCompBlock *pBlock = &query.pBlock[query.slot]; - pImport->numOfPoints = pBlock->numOfPoints; - - if (pImport->key != key) { - if (order == 0) { - pImport->pos++; - - if (pImport->pos >= pBlock->numOfPoints) { - pImport->slot++; - pImport->pos = 0; - } - } else { - if (pImport->pos < 0) pImport->pos = 0; - } - } - - if (pImport->key != key && pImport->pos > 0) { - if ( pObj->sversion != pBlock->sversion ) { - dError("vid:%d sid:%d id:%s, import sversion not matached, expected:%d received:%d", pObj->vnode, pObj->sid, - pBlock->sversion, pObj->sversion); - code = TSDB_CODE_OTHERS; - } else { - pImport->offset = pBlock->offset; - - pImport->buffer = - malloc(pObj->bytesPerPoint * pVnode->cfg.rowsInFileBlock + sizeof(SData) * pObj->numOfColumns); - pImport->sdata[0] = (SData *)pImport->buffer; - for (int col = 1; col < pObj->numOfColumns; ++col) - pImport->sdata[col] = (SData *)(((char *)pImport->sdata[col - 1]) + sizeof(SData) + - pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes); - - code = vnodeReadCompBlockToMem(pObj, &query, pImport->sdata); - if (code < 0) { - code = -code; - tfree(pImport->buffer); - } - } - } - } - } else { - dError("vid:%d sid:%d id:%s, file is corrupted, import failed", pObj->vnode, pObj->sid, pObj->meterId); - code = -ret; - } - - tclose(query.hfd); - tclose(query.dfd); - tclose(query.lfd); - vnodeFreeFields(&query); - tfree(query.pBlock); - - return code; -} - -int vnodeFindKeyInCache(SImportInfo *pImport, int order) { - SMeterObj *pObj = pImport->pObj; - int code = 0; - SQuery query; - SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; - - TSKEY key = order ? pImport->firstKey : pImport->lastKey; - memset(&query, 0, sizeof(query)); - query.order.order = order; - query.skey = key; - query.ekey = order ? pImport->lastKey : pImport->firstKey; - vnodeSearchPointInCache(pObj, &query); - - if (query.slot < 0) { - pImport->slot = pInfo->commitSlot; - if (pInfo->commitPoint >= pObj->pointsPerBlock) pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks; - pImport->pos = 0; - pImport->key = 0; - dTrace("vid:%d sid:%d id:%s, key:%ld, import to head of cache", pObj->vnode, pObj->sid, pObj->meterId, key); - code = 0; - } else { - pImport->slot = query.slot; - pImport->pos = query.pos; - pImport->key = query.key; - - if (key != query.key) { - if (order == 0) { - // since pos is the position which has smaller key, data shall be imported after it - pImport->pos++; - if (pImport->pos >= pObj->pointsPerBlock) { - pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks; - pImport->pos = 0; - } - } else { - if (pImport->pos < 0) pImport->pos = 0; - } - } - code = 0; - } - - return code; -} - -int vnodeImportStartToCache(SImportInfo *pImport, char *payload, int rows) { - int code = 0; - SMeterObj *pObj = pImport->pObj; - - code = vnodeFindKeyInCache(pImport, 1); - if (code != 0) return code; - - if (pImport->key != pImport->firstKey) { - rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key); - pImport->importedRows = rows; - code = vnodeImportToCache(pImport, payload, rows); - } else { - dTrace("vid:%d sid:%d id:%s, data is already imported to cache, firstKey:%lld", pObj->vnode, pObj->sid, pObj->meterId, pImport->firstKey); - } - - return code; -} - -int vnodeImportStartToFile(SImportInfo *pImport, char *payload, int rows) { - int code = 0; - SMeterObj *pObj = pImport->pObj; - - code = vnodeFindKeyInFile(pImport, 1); - if (code != 0) return code; - - assert(pImport->slot >= 0); - - if (pImport->key != pImport->firstKey) { - pImport->payload = payload; - pImport->rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key); - pImport->importedRows = pImport->rows; - code = vnodeImportToFile(pImport); - } else { - dTrace("vid:%d sid:%d id:%s, data is already imported to file", pObj->vnode, pObj->sid, pObj->meterId); - } - - return code; -} - -int vnodeImportWholeToFile(SImportInfo *pImport, char *payload, int rows) { - int code = 0; - SMeterObj *pObj = pImport->pObj; - - code = vnodeFindKeyInFile(pImport, 0); - if (code != 0) return code; - - if (pImport->key != pImport->lastKey) { - pImport->payload = payload; - pImport->rows = vnodeGetImportEndPart(pObj, payload, rows, &pImport->payload, pImport->key); - pImport->importedRows = pImport->rows; - code = vnodeImportToFile(pImport); - } else { - code = vnodeImportStartToFile(pImport, payload, rows); - } - - return code; -} - -int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) { - int code = 0; - SMeterObj *pObj = pImport->pObj; - - code = vnodeFindKeyInCache(pImport, 0); - if (code != 0) return code; - - if (pImport->key != pImport->lastKey) { - char *pStart; - if ( pImport->key < pObj->lastKeyOnFile ) pImport->key = pObj->lastKeyOnFile; - rows = vnodeGetImportEndPart(pObj, payload, rows, &pStart, pImport->key); - pImport->importedRows = rows; - code = vnodeImportToCache(pImport, pStart, rows); - } else { - if (pImport->firstKey > pObj->lastKeyOnFile) { - code = vnodeImportStartToCache(pImport, payload, rows); - } else if (pImport->firstKey < pObj->lastKeyOnFile) { - code = vnodeImportStartToFile(pImport, payload, rows); - } else { // firstKey == pObj->lastKeyOnFile - dTrace("vid:%d sid:%d id:%s, data is already there", pObj->vnode, pObj->sid, pObj->meterId); - } - } - - return code; -} - -int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion, - int *pNumOfPoints, TSKEY now) { - SSubmitMsg *pSubmit = (SSubmitMsg *)cont; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - int rows; - char *payload; - int code = TSDB_CODE_ACTION_IN_PROGRESS; - SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - SShellObj *pShell = (SShellObj *)param; - int pointsImported = 0; - - rows = htons(pSubmit->numOfRows); - int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows); - if (expectedLen != contLen) { - dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId, - expectedLen, contLen); - return TSDB_CODE_WRONG_MSG_SIZE; - } - - if (sversion != pObj->sversion) { - dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId, - pObj->sversion, sversion); - return TSDB_CODE_OTHERS; - } - - payload = pSubmit->payLoad; - int firstId = (*(TSKEY *)payload)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; - int lastId = (*(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1)))/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; - int cfile = now/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; - if ((firstId <= cfile - pVnode->maxFiles) || (firstId > cfile + 1) || (lastId <= cfile - pVnode->maxFiles) || (lastId > cfile + 1)) { - dError("vid:%d sid:%d id:%s, invalid timestamp to import, rows:%d firstKey: %ld lastKey: %ld", - pObj->vnode, pObj->sid, pObj->meterId, rows, *(TSKEY *)(payload), *(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1))); - return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; - } - - if ( pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { - if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG; - code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion); - if (code != 0) return code; - } - - if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) { - vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); - code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now); - - if (pShell) { - pShell->code = code; - pShell->numOfTotalPoints += pointsImported; - } - - vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); - } else { - SImportInfo *pNew, import; - - dTrace("vid:%d sid:%d id:%s, import %d rows data", pObj->vnode, pObj->sid, pObj->meterId, rows); - memset(&import, 0, sizeof(import)); - import.firstKey = *((TSKEY *)(payload)); - import.lastKey = *((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)); - import.pObj = pObj; - import.pShell = pShell; - import.payload = payload; - import.rows = rows; - - int32_t num = 0; - pthread_mutex_lock(&pVnode->vmutex); - num = pObj->numOfQueries; - pthread_mutex_unlock(&pVnode->vmutex); - - int32_t commitInProcess = 0; - - pthread_mutex_lock(&pPool->vmutex); - if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) { - pthread_mutex_unlock(&pPool->vmutex); - - pNew = (SImportInfo *)malloc(sizeof(SImportInfo)); - memcpy(pNew, &import, sizeof(SImportInfo)); - pNew->signature = pNew; - int payloadLen = contLen - sizeof(SSubmitMsg); - pNew->payload = malloc(payloadLen); - pNew->opayload = pNew->payload; - memcpy(pNew->payload, payload, payloadLen); - - dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid, - pObj->meterId, commitInProcess, pObj->numOfQueries); - - taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl); - return 0; - } else { - pPool->commitInProcess = 1; - pthread_mutex_unlock(&pPool->vmutex); - int code = vnodeImportData(pObj, &import); - if (pShell) { - pShell->code = code; - pShell->numOfTotalPoints += import.importedRows; - } - } - } - - pVnode->version++; - - if (pShell) { - pShell->count--; - if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pShell, pShell->code, pShell->numOfTotalPoints); - } - - return 0; -} - -//todo abort from the procedure if the meter is going to be dropped -int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) { - int code = 0; - - if (pImport->lastKey > pObj->lastKeyOnFile) { - code = vnodeImportWholeToCache(pImport, pImport->payload, pImport->rows); - } else if (pImport->lastKey < pObj->lastKeyOnFile) { - code = vnodeImportWholeToFile(pImport, pImport->payload, pImport->rows); - } else { // lastKey == pObj->lastkeyOnFile - code = vnodeImportStartToFile(pImport, pImport->payload, pImport->rows); - } - - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - pPool->commitInProcess = 0; - - if (pImport->commit) vnodeProcessCommitTimer(pVnode, NULL); - - return code; -} diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index bd52ac0bc4..e0dcb0aa3f 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -493,27 +493,6 @@ char *taosIpStr(uint32_t ipInt) { sprintf(ipStr, "0x%x:%u.%u.%u.%u", ipInt, ipInt & 0xFF, (ipInt >> 8) & 0xFF, (ipInt >> 16) & 0xFF, (uint8_t)(ipInt >> 24)); return ipStr; } -<<<<<<< HEAD - -typedef struct CharsetPair { - char *oldCharset; - char *newCharset; -} CharsetPair; - -char *taosCharsetReplace(char *charsetstr) { - CharsetPair charsetRep[] = { - { "utf8", "UTF-8" }, { "936", "CP936" }, - }; - - for (int32_t i = 0; i < tListLen(charsetRep); ++i) { - if (strcasecmp(charsetRep[i].oldCharset, charsetstr) == 0) { - return strdup(charsetRep[i].newCharset); - } - } - - return strdup(charsetstr); -} -======= #ifndef CLUSTER void taosCleanupTier() {} @@ -549,4 +528,3 @@ char *taosCharsetReplace(char *charsetstr) { return strdup(charsetstr); } ->>>>>>> release/v1.6.4.0 From eaa767be1bc459de0cd239b2bf1eed187a83bed8 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sun, 1 Dec 2019 21:34:39 +0800 Subject: [PATCH 04/16] [tbase-1269] --- src/system/detail/src/mgmtSupertableQuery.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/system/detail/src/mgmtSupertableQuery.c b/src/system/detail/src/mgmtSupertableQuery.c index f5e0509c24..4dc7760d89 100644 --- a/src/system/detail/src/mgmtSupertableQuery.c +++ b/src/system/detail/src/mgmtSupertableQuery.c @@ -56,7 +56,7 @@ static int32_t tabObjVGIDComparator(const void* pLeft, const void* pRight) { // monotonic inc in memory address static int32_t tabObjPointerComparator(const void* pLeft, const void* pRight) { - int64_t ret = (int64_t)pLeft - (int64_t)pRight; + int64_t ret = (*(STabObj**)(pLeft))->uid - (*(STabObj**)(pRight))->uid; if (ret == 0) { return 0; } else { @@ -427,11 +427,11 @@ static tQueryResultset* doNestedLoopIntersect(tQueryResultset* pRes1, tQueryResu } static tQueryResultset* doSortIntersect(tQueryResultset* pRes1, tQueryResultset* pRes2) { - size_t sizePtr = sizeof(void*); - + size_t sizePtr = sizeof(void *); + qsort(pRes1->pRes, pRes1->num, sizePtr, tabObjPointerComparator); qsort(pRes2->pRes, pRes2->num, sizePtr, tabObjPointerComparator); - + int32_t i = 0; int32_t j = 0; From 167d6cb70edc8a92d32f804eaa5f51d30d28c95e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 2 Dec 2019 15:47:18 +0800 Subject: [PATCH 05/16] fix #813 --- src/system/detail/src/vnodeImport.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index 2a0fda29f9..f0019a92ee 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -466,8 +466,6 @@ static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int SCompBlock *pBlock = pHandle->pBlocks + blockId; *code = TSDB_CODE_SUCCESS; - assert(pBlock->sversion == pObj->sversion); - SVnodeObj *pVnode = vnodeList + pObj->vnode; int dfd = pBlock->last ? pVnode->lfd : pVnode->dfd; @@ -989,6 +987,13 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int } } + int aslot = MIN(blockIter.slot, importHandle.compInfo.numOfBlocks - 1); + int64_t sversion = importHandle.pBlocks[aslot].sversion; + if (sversion != pObj->sversion) { + code = TSDB_CODE_OTHERS; + goto _error_merge; + } + // Open the new .t file if not opened yet. if (pVnode->nfd <= 0) { if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) { From e593c12cef6b7247d5d7ed24d372182fdbc31ee4 Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 2 Dec 2019 16:18:32 +0800 Subject: [PATCH 06/16] Instructions for using the RESTful interface --- .../webdocs/markdowndocs/connector-ch.md | 126 ++++++++++++++++-- 1 file changed, 116 insertions(+), 10 deletions(-) diff --git a/documentation/webdocs/markdowndocs/connector-ch.md b/documentation/webdocs/markdowndocs/connector-ch.md index 9261b9eef6..e629a44bf1 100644 --- a/documentation/webdocs/markdowndocs/connector-ch.md +++ b/documentation/webdocs/markdowndocs/connector-ch.md @@ -316,11 +316,23 @@ import taos 如:http://192.168.0.1:6020/rest/sql 是指向IP地址为192.168.0.1的URL. -HTTP请求的Header里需带有身份认证信息,TDengine单机版仅支持Basic认证机制。 +HTTP请求的Header里需带有身份认证信息,TDengine支持Basic认证与自定义认证两种机制,后续版本将提供标准安全的数字签名机制来做身份验证。 + +- 自定义身份认证信息如下所示(稍后介绍) + +``` +Authorization: Taosd +``` + +- Basic身份认证信息如下所示 + +``` +Authorization: Basic +``` HTTP请求的BODY里就是一个完整的SQL语句,SQL语句中的数据表应提供数据库前缀,例如\.\。如果表名不带数据库前缀,系统会返回错误。因为HTTP模块只是一个简单的转发,没有当前DB的概念。 -使用curl来发起一个HTTP Request, 语法如下: +使用curl通过自定义身份认证方式来发起一个HTTP Request, 语法如下: ``` curl -H 'Authorization: Basic ' -d '' :/rest/sql @@ -332,7 +344,7 @@ curl -H 'Authorization: Basic ' -d '' :/rest/sql curl -u username:password -d '' :/rest/sql ``` -其中,`TOKEN`为`{username}:{password}`经过Base64编码之后的字符串,例如`root:taosdata`编码后为`cm9vdDp0YW9zZGF0YQ==` +其中,`TOKEN`为`{username}:{password}`经过Base64编码之后的字符串, 例如`root:taosdata`编码后为`cm9vdDp0YW9zZGF0YQ==` ### HTTP返回格式 @@ -356,21 +368,55 @@ curl -u username:password -d '' :/rest/sql - 第三行是具体返回的数据,一排一排的呈现。如果不返回结果集,仅[[affected_rows]] - 第四行”rows”表明总共多少行数据 +### 自定义授权码 + +HTTP请求中需要带有授权码``, 用于身份识别。授权码通常由管理员提供, 可简单的通过发送`HTTP GET`请求来获取授权码, 操作如下: + +``` +curl http://:6020/rest/login// +``` + +其中, `ip`是TDengine数据库的IP地址, `username`为数据库用户名, `password`为数据库密码, 返回值为`JSON`格式, 各字段含义如下: + +- status:请求结果的标志位 + +- code:返回值代码 + +- desc: 授权码 + +获取授权码示例: + +``` +curl http://192.168.0.1:6020/rest/login/root/taosdata +``` + +返回值: + +``` +{ + "status": "succ", + "code": 0, + "desc": +"/KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04" +} +``` + ### 使用示例 - 在demo库里查询表t1的所有记录, curl如下: - `curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sql` - - 返回值: +``` +curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sql` +``` +返回值: ``` { "status": "succ", "head": ["column1","column2","column3"], "data": [ - ["2017-12-12 23:44:25.730", 1, 2.3], - ["2017-12-12 22:44:25.728", 4, 5.6] + ["2017-12-12 22:44:25.728",4,5.60000], + ["2017-12-12 23:44:25.730",1,2.30000] ], "rows": 2 } @@ -378,9 +424,11 @@ curl -u username:password -d '' :/rest/sql - 创建库demo: - `curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create database demo' 192.168.0.1:6020/rest/sql` +``` +curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create database demo' 192.168.0.1:6020/rest/sql` +``` - 返回值: +返回值: ``` { "status": "succ", @@ -390,6 +438,64 @@ curl -u username:password -d '' :/rest/sql } ``` +### 其他用法 + +#### 结果集采用Unix时间戳 + +HTTP请求URL采用`sqlt`时,返回结果集的时间戳将采用Unix时间戳格式表示,例如 + +``` +curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sqlt` +``` + +返回值: + +``` +{ + "status": "succ", + "head": ["column1","column2","column3"], + "data": [ + [1513089865728,4,5.60000], + [1513093465730,1,2.30000] + ], + "rows": 2 +} +``` + +#### 结果集采用UTC时间字符串 + +HTTP请求URL采用`sqlutc`时,返回结果集的时间戳将采用UTC时间字符串表示,例如 +``` + curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sqlutc` +``` + +返回值: + +``` +{ + "status": "succ", + "head": ["column1","column2","column3"], + "data": [ + ["2017-12-12T22:44:25.728+0800",4,5.60000], + ["2017-12-12T23:44:25.730+0800",1,2.30000] + ], + "rows": 2 +} +``` + +### 重要配置项 + +下面仅列出一些与RESTFul接口有关的配置参数,其他系统参数请看配置文件里的说明。注意:配置修改后,需要重启taosd服务才能生效 + +- httpIp: 对外提供RESTFul服务的IP地址,默认绑定到0.0.0.0 +- httpPort: 对外提供RESTFul服务的端口号,默认绑定到6020 +- httpMaxThreads: 启动的线程数量,默认为2 +- httpCacheSessions: 缓存连接的数量,并发请求数目需小于此数值的10倍,默认值为100 +- restfulRowLimit: 返回结果集(JSON格式)的最大条数,默认值为10240 +- httpEnableCompress: 是否支持压缩,默认不支持,目前TDengine仅支持gzip压缩格式 +- httpDebugFlag: 日志开关,131:仅错误和报警信息,135:所有,默认131 + + ## Go Connector #### 安装TDengine From 0517feb1fd1c52733ca8683b78c913a7eb594a43 Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 2 Dec 2019 16:31:38 +0800 Subject: [PATCH 07/16] Instructions for using the RESTful interface --- .../webdocs/markdowndocs/connector-ch.md | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/documentation/webdocs/markdowndocs/connector-ch.md b/documentation/webdocs/markdowndocs/connector-ch.md index e629a44bf1..85596f2053 100644 --- a/documentation/webdocs/markdowndocs/connector-ch.md +++ b/documentation/webdocs/markdowndocs/connector-ch.md @@ -306,15 +306,16 @@ import taos ### HTTP请求格式 -​ `http://:/rest/sql` +``` +http://:/rest/sql +``` -​ 参数说明: +​参数说明: -​ IP: 集群中的任一台主机 +- IP: 集群中的任一台主机 +- PORT: 配置文件中httpPort配置项,缺省为6020 -​ PORT: 配置文件中httpPort配置项,缺省为6020 - -如:http://192.168.0.1:6020/rest/sql 是指向IP地址为192.168.0.1的URL. +例如:http://192.168.0.1:6020/rest/sql 是指向IP地址为192.168.0.1的URL. HTTP请求的Header里需带有身份认证信息,TDengine支持Basic认证与自定义认证两种机制,后续版本将提供标准安全的数字签名机制来做身份验证。 @@ -348,7 +349,8 @@ curl -u username:password -d '' :/rest/sql ### HTTP返回格式 -返回值为JSON格式,如下: +返回值为JSON格式,如下: + ``` { "status": "succ", @@ -363,10 +365,10 @@ curl -u username:password -d '' :/rest/sql 说明: -- 第一行”status”告知操作结果是成功还是失败; -- 第二行”head”是表的定义,如果不返回结果集,仅有一列“affected_rows”; -- 第三行是具体返回的数据,一排一排的呈现。如果不返回结果集,仅[[affected_rows]] -- 第四行”rows”表明总共多少行数据 +- status: 告知操作结果是成功还是失败 +- head: 表的定义,如果不返回结果集,仅有一列“affected_rows” +- data: 具体返回的数据,一排一排的呈现,如果不返回结果集,仅[[affected_rows]] +- rows: 表明总共多少行数据 ### 自定义授权码 @@ -403,7 +405,7 @@ curl http://192.168.0.1:6020/rest/login/root/taosdata ### 使用示例 -- 在demo库里查询表t1的所有记录, curl如下: +- 在demo库里查询表t1的所有记录: ``` curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sql` @@ -445,7 +447,7 @@ curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create database demo' 19 HTTP请求URL采用`sqlt`时,返回结果集的时间戳将采用Unix时间戳格式表示,例如 ``` -curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sqlt` +curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sqlt ``` 返回值: @@ -466,7 +468,7 @@ curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 1 HTTP请求URL采用`sqlutc`时,返回结果集的时间戳将采用UTC时间字符串表示,例如 ``` - curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sqlutc` + curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'select * from demo.t1' 192.168.0.1:6020/rest/sqlutc ``` 返回值: From e5418dd3e13370be8a5d5b7518e5069472389e58 Mon Sep 17 00:00:00 2001 From: haojun Liao Date: Mon, 2 Dec 2019 18:37:51 +0800 Subject: [PATCH 08/16] Update TAOS SQL-ch.md Add some detailed description with respect to the time unit conversion for interval clause. --- documentation/webdocs/markdowndocs/TAOS SQL-ch.md | 1 + 1 file changed, 1 insertion(+) diff --git a/documentation/webdocs/markdowndocs/TAOS SQL-ch.md b/documentation/webdocs/markdowndocs/TAOS SQL-ch.md index 6a8549bbd2..347ac4f21f 100644 --- a/documentation/webdocs/markdowndocs/TAOS SQL-ch.md +++ b/documentation/webdocs/markdowndocs/TAOS SQL-ch.md @@ -18,6 +18,7 @@ TDengine提供类似SQL语法,用户可以在TDengine Shell中使用SQL语句 - 插入记录时,如果时间戳为0,插入数据时使用服务器当前时间 - Epoch Time: 时间戳也可以是一个长整数,表示从1970-01-01 08:00:00.000开始的毫秒数 - 时间可以加减,比如 now-2h,表明查询时刻向前推2个小时(最近2小时)。数字后面的时间单位:a(毫秒), s(秒), m(分), h(小时), d(天),w(周), n(月), y(年)。比如select * from t1 where ts > now-2w and ts <= now-1w, 表示查询两周前整整一周的数据 +- TDengine暂不支持时间窗口按照自然年和自然月切分。Where条件中的时间窗口单位的换算关系如下:interval(1y) 等效于 interval(365d), interval(1n) 等效于 interval(30d), interval(1w) 等效于 interval(7d) TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMicrosecond就可支持微秒。 From d468cc5eb2d5187c06982b4bcec11fb195101a54 Mon Sep 17 00:00:00 2001 From: lihui Date: Mon, 2 Dec 2019 19:01:20 +0800 Subject: [PATCH 09/16] [TBASE-1271] --- packaging/deb/makedeb.sh | 2 +- packaging/rpm/makerpm.sh | 27 ++++++++++++++++++++++++--- packaging/rpm/tdengine.spec | 2 +- packaging/tools/makeclient.sh | 4 ++-- packaging/tools/makepkg.sh | 4 ++-- src/kit/shell/src/shellEngine.c | 14 ++++++++++++-- src/kit/shell/src/shellLinux.c | 9 +++++++++ src/system/detail/src/dnodeService.c | 7 ++++++- 8 files changed, 57 insertions(+), 12 deletions(-) diff --git a/packaging/deb/makedeb.sh b/packaging/deb/makedeb.sh index 5766bd6836..501a06eddb 100755 --- a/packaging/deb/makedeb.sh +++ b/packaging/deb/makedeb.sh @@ -63,7 +63,7 @@ debver="Version: "$tdengine_ver sed -i "2c$debver" ${pkg_dir}/DEBIAN/control #get taos version, then set deb name -debname="tdengine-"${tdengine_ver}".deb" +debname="TDengine-"${tdengine_ver}".deb" # make deb package dpkg -b ${pkg_dir} $debname diff --git a/packaging/rpm/makerpm.sh b/packaging/rpm/makerpm.sh index 3b0f1d098e..e301f5ece9 100755 --- a/packaging/rpm/makerpm.sh +++ b/packaging/rpm/makerpm.sh @@ -2,6 +2,9 @@ # # Generate rpm package for centos +#set -e +#set -x + #curr_dir=$(pwd) compile_dir=$1 output_dir=$2 @@ -24,8 +27,25 @@ if command -v sudo > /dev/null; then csudo="sudo" fi +function cp_rpm_package() { +local cur_dir +cd $1 +cur_dir=$(pwd) + +for dirlist in $(ls ${cur_dir}); do + if test -d ${dirlist}; then + cd ${dirlist} + cp_rpm_package ${cur_dir}/${dirlist} + cd .. + fi + if test -e ${dirlist}; then + cp ${cur_dir}/${dirlist} ${output_dir}/TDengine-${tdengine_ver}.rpm + fi +done +} + if [ -d ${pkg_dir} ]; then - ${csudo} rm -rf ${pkg_dir} + ${csudo} rm -rf ${pkg_dir} fi ${csudo} mkdir -p ${pkg_dir} cd ${pkg_dir} @@ -35,7 +55,8 @@ ${csudo} mkdir -p BUILD BUILDROOT RPMS SOURCES SPECS SRPMS ${csudo} rpmbuild --define="_version ${tdengine_ver}" --define="_topdir ${pkg_dir}" --define="_compiledir ${compile_dir}" -bb ${spec_file} # copy rpm package to output_dir, then clean temp dir -#echo "rmpbuild end, cur_dir: $(pwd) " -${csudo} cp -rf RPMS/* ${output_dir} +#${csudo} cp -rf RPMS/* ${output_dir} +cp_rpm_package ${pkg_dir}/RPMS + cd .. ${csudo} rm -rf ${pkg_dir} diff --git a/packaging/rpm/tdengine.spec b/packaging/rpm/tdengine.spec index c0c0eacfae..ef02fb90fc 100644 --- a/packaging/rpm/tdengine.spec +++ b/packaging/rpm/tdengine.spec @@ -2,7 +2,7 @@ %define cfg_install_dir /etc/taos %define __strip /bin/true -Name: tdengine +Name: TDengine Version: %{_version} Release: 3%{?dist} Summary: tdengine from taosdata diff --git a/packaging/tools/makeclient.sh b/packaging/tools/makeclient.sh index 7e22116ac5..b4948bb3a7 100755 --- a/packaging/tools/makeclient.sh +++ b/packaging/tools/makeclient.sh @@ -19,8 +19,8 @@ code_dir="${top_dir}/src" release_dir="${top_dir}/release" community_dir="${script_dir}/../../../community/src" -package_name='linux' -install_dir="${release_dir}/taos-client-${version}-${package_name}-$(echo ${build_time}| tr ': ' -)" +#package_name='linux' +install_dir="${release_dir}/TDengine-client-${version}" # Directories and files. bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdump ${script_dir}/remove_client.sh" diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index 24f3a0b8d1..714b74dbe6 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -16,8 +16,8 @@ build_dir="${compile_dir}/build" code_dir="${top_dir}/src" release_dir="${top_dir}/release" -package_name='linux' -install_dir="${release_dir}/taos-${version}-${package_name}-$(echo ${build_time}| tr ': ' -)" +#package_name='linux' +install_dir="${release_dir}/TDengine-${version}" # Directories and files. bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdemo ${build_dir}/bin/taosdump ${script_dir}/remove.sh" diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 2eb9893556..ecda6912c3 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -29,9 +29,19 @@ #elif defined(DARWIN) char CLIENT_VERSION[] = "Welcome to the TDengine shell from mac, client version:%s "; #else - char CLIENT_VERSION[] = "Welcome to the TDengine shell from linux, client version:%s "; + #ifdef CLUSTER + char CLIENT_VERSION[] = "Welcome to the TDengine shell from linux, enterprise client version:%s "; + #else + char CLIENT_VERSION[] = "Welcome to the TDengine shell from linux, community client version:%s "; + #endif #endif -char SERVER_VERSION[] = "server version:%s\nCopyright (c) 2017 by TAOS Data, Inc. All rights reserved.\n\n"; + +#ifdef CLUSTER + char SERVER_VERSION[] = "enterprise server version:%s\nCopyright (c) 2017 by TAOS Data, Inc. All rights reserved.\n\n"; +#else + char SERVER_VERSION[] = "community server version:%s\nCopyright (c) 2017 by TAOS Data, Inc. All rights reserved.\n\n"; +#endif + char PROMPT_HEADER[] = "taos> "; char CONTINUE_PROMPT[] = " -> "; int prompt_size = 6; diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index 0ece4efbb2..ad8bf6c5c3 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -105,6 +105,15 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { static struct argp argp = {options, parse_opt, args_doc, doc}; void shellParseArgument(int argc, char *argv[], struct arguments *arguments) { + char verType[32] = {0}; + #ifdef CLUSTER + sprintf(verType, "enterprise version: %s\n", version); + #else + sprintf(verType, "community version: %s\n", version); + #endif + + argp_program_version = verType; + argp_parse(&argp, argc, argv, 0, 0, arguments); if (arguments->abort) { error(10, 0, "ABORTED"); diff --git a/src/system/detail/src/dnodeService.c b/src/system/detail/src/dnodeService.c index 9764afc593..f03bd5f3bb 100644 --- a/src/system/detail/src/dnodeService.c +++ b/src/system/detail/src/dnodeService.c @@ -55,7 +55,12 @@ int main(int argc, char *argv[]) { exit(EXIT_FAILURE); } } else if (strcmp(argv[i], "-V") == 0) { - printf("version: %s compatible_version: %s\n", version, compatible_version); + #ifdef CLUSTER + printf("enterprise version: %s compatible_version: %s\n", version, compatible_version); + #else + printf("community version: %s compatible_version: %s\n", version, compatible_version); + #endif + printf("gitinfo: %s\n", gitinfo); printf("buildinfo: %s\n", buildinfo); return 0; From 2002c8e20b0774be018e4d58d7f0e92a1c2919ac Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 2 Dec 2019 20:34:32 +0800 Subject: [PATCH 10/16] rror may occur when reading http chunked body --- src/modules/http/src/httpHandle.c | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/modules/http/src/httpHandle.c b/src/modules/http/src/httpHandle.c index 16e8378fb8..c736825b37 100644 --- a/src/modules/http/src/httpHandle.c +++ b/src/modules/http/src/httpHandle.c @@ -293,19 +293,14 @@ bool httpReadChunkedBody(HttpContext* pContext, HttpParser* pParser) { int httpReadUnChunkedBody(HttpContext* pContext, HttpParser* pParser) { int dataReadLen = pParser->bufsize - (int)(pParser->data.pos - pParser->buffer); if (dataReadLen > pParser->data.len) { - httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, dataReadLen:%d > pContext->data.len:%d", - pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len); + httpError("context:%p, fd:%d, ip:%s, un-chunked body length invalid, read size:%d dataReadLen:%d > pContext->data.len:%d", + pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len); httpSendErrorResp(pContext, HTTP_PARSE_BODY_ERROR); return HTTP_CHECK_BODY_ERROR; } else if (dataReadLen < pParser->data.len) { - httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, dataReadLen:%d < pContext->data.len:%d, continue read", - pContext, pContext->fd, pContext->ipstr, dataReadLen, pParser->data.len); - if (!httpReadDataImp(pContext)) { - httpError("context:%p, fd:%d, ip:%s, read chunked request error", pContext, pContext->fd, pContext->ipstr); - return HTTP_CHECK_BODY_ERROR; - } else { - return HTTP_CHECK_BODY_CONTINUE; - } + httpTrace("context:%p, fd:%d, ip:%s, un-chunked body not finished, read size:%d dataReadLen:%d < pContext->data.len:%d, continue read", + pContext, pContext->fd, pContext->ipstr, pContext->parser.bufsize, dataReadLen, pParser->data.len); + return HTTP_CHECK_BODY_CONTINUE; } else { return HTTP_CHECK_BODY_SUCCESS; } From 387a049e388b9e9a83d7033431832db59730765c Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 2 Dec 2019 22:27:38 +0800 Subject: [PATCH 11/16] version.c --- src/util/src/version.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/util/src/version.c b/src/util/src/version.c index ed83066f80..4eab3fb7e2 100644 --- a/src/util/src/version.c +++ b/src/util/src/version.c @@ -1,4 +1,4 @@ -char version[64] = "1.6.4.0"; +char version[64] = "1.6.4.1"; char compatible_version[64] = "1.6.1.0"; -char gitinfo[128] = "6d27c11e3b23ae69366df366a6517853648c41f7"; -char buildinfo[512] = "Built by ubuntu at 2019-12-01 12:27"; +char gitinfo[128] = "893fac9da79ef9b88355fcd18d29057adf909bbd"; +char buildinfo[512] = "Built by ubuntu at 2019-12-02 22:21"; From 13298fe5d6b5c7d48735139a42a375e2f6c301e6 Mon Sep 17 00:00:00 2001 From: lihui Date: Tue, 3 Dec 2019 13:54:33 +0800 Subject: [PATCH 12/16] [TBASE-816] --- CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a1b2c16f46..71cb0bfd43 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -146,7 +146,7 @@ IF (NOT DEFINED TD_CLUSTER) SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -Wno-char-subscripts -malign-stringops -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") ENDIF () ELSE () - SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -Wno-char-subscripts -fsigned-char -munaligned-access -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") + SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -Wno-char-subscripts -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") ENDIF () ADD_DEFINITIONS(-DLINUX) ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT) @@ -156,7 +156,7 @@ IF (NOT DEFINED TD_CLUSTER) ENDIF () SET(DEBUG_FLAGS "-O0 -DDEBUG") SET(RELEASE_FLAGS "-O0") - SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -Wno-char-subscripts -fsigned-char -munaligned-access -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") + SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -Wno-char-subscripts -fsigned-char -munaligned-access -fpack-struct=8 -latomic -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") ADD_DEFINITIONS(-DLINUX) ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT) ADD_DEFINITIONS(-DUSE_LIBICONV) From 9a659e3b4d9ea6db7b03e38f50287acbc28bd109 Mon Sep 17 00:00:00 2001 From: lihui Date: Tue, 3 Dec 2019 15:11:04 +0800 Subject: [PATCH 13/16] [TBASE-816] --- packaging/deb/makedeb.sh | 12 +++++++++++- packaging/release.sh | 4 ++-- packaging/rpm/makerpm.sh | 7 +++++++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/packaging/deb/makedeb.sh b/packaging/deb/makedeb.sh index 501a06eddb..17bb5aabae 100755 --- a/packaging/deb/makedeb.sh +++ b/packaging/deb/makedeb.sh @@ -7,6 +7,7 @@ compile_dir=$1 output_dir=$2 tdengine_ver=$3 +armver=$4 script_dir="$(dirname $(readlink -f $0))" top_dir="$(readlink -m ${script_dir}/../..)" @@ -63,7 +64,16 @@ debver="Version: "$tdengine_ver sed -i "2c$debver" ${pkg_dir}/DEBIAN/control #get taos version, then set deb name -debname="TDengine-"${tdengine_ver}".deb" +if [ -z "$armver" ]; then + debname="TDengine-"${tdengine_ver}".deb" +elif [ "$armver" == "arm64" ]; then + debname="TDengine-"${tdengine_ver}"-arm64.deb" +elif [ "$armver" == "arm32" ]; then + debname="TDengine-"${tdengine_ver}-arm32".deb" +else + echo "input parameter error!!!" + return +fi # make deb package dpkg -b ${pkg_dir} $debname diff --git a/packaging/release.sh b/packaging/release.sh index 58f69589d8..0c806a159f 100755 --- a/packaging/release.sh +++ b/packaging/release.sh @@ -149,7 +149,7 @@ if [ -d ${output_dir} ]; then fi ${csudo} mkdir -p ${output_dir} cd ${script_dir}/deb -${csudo} ./makedeb.sh ${compile_dir} ${output_dir} ${version} +${csudo} ./makedeb.sh ${compile_dir} ${output_dir} ${version} ${armver} echo "do rpm package for the centos system" output_dir="${top_dir}/rpms" @@ -158,7 +158,7 @@ if [ -d ${output_dir} ]; then fi ${csudo} mkdir -p ${output_dir} cd ${script_dir}/rpm -${csudo} ./makerpm.sh ${compile_dir} ${output_dir} ${version} +${csudo} ./makerpm.sh ${compile_dir} ${output_dir} ${version} ${armver} echo "do tar.gz package for all systems" cd ${script_dir}/tools diff --git a/packaging/rpm/makerpm.sh b/packaging/rpm/makerpm.sh index e301f5ece9..aef01875cb 100755 --- a/packaging/rpm/makerpm.sh +++ b/packaging/rpm/makerpm.sh @@ -9,6 +9,7 @@ compile_dir=$1 output_dir=$2 tdengine_ver=$3 +armver=$4 script_dir="$(dirname $(readlink -f $0))" top_dir="$(readlink -m ${script_dir}/../..)" @@ -58,5 +59,11 @@ ${csudo} rpmbuild --define="_version ${tdengine_ver}" --define="_topdir ${pkg_di #${csudo} cp -rf RPMS/* ${output_dir} cp_rpm_package ${pkg_dir}/RPMS +if [ "$armver" == "arm64" ]; then + mv ${output_dir}/TDengine-${tdengine_ver}.rpm ${output_dir}/TDengine-${tdengine_ver}-arm64.rpm +elif [ "$armver" == "arm32" ]; then + mv ${output_dir}/TDengine-${tdengine_ver}.rpm ${output_dir}/TDengine-${tdengine_ver}-arm32.rpm +fi + cd .. ${csudo} rm -rf ${pkg_dir} From 8fb2d0c03eb435865e50c4432c27b5b0aae3f6b6 Mon Sep 17 00:00:00 2001 From: lihui Date: Tue, 3 Dec 2019 15:22:35 +0800 Subject: [PATCH 14/16] [TBASE-816] --- packaging/deb/makedeb.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/deb/makedeb.sh b/packaging/deb/makedeb.sh index 17bb5aabae..5c2df734fa 100755 --- a/packaging/deb/makedeb.sh +++ b/packaging/deb/makedeb.sh @@ -69,7 +69,7 @@ if [ -z "$armver" ]; then elif [ "$armver" == "arm64" ]; then debname="TDengine-"${tdengine_ver}"-arm64.deb" elif [ "$armver" == "arm32" ]; then - debname="TDengine-"${tdengine_ver}-arm32".deb" + debname="TDengine-"${tdengine_ver}"-arm32.deb" else echo "input parameter error!!!" return From bff25a36a47cabdb85dd3ece84c900c3318c3c6d Mon Sep 17 00:00:00 2001 From: slguan Date: Tue, 3 Dec 2019 17:37:10 +0800 Subject: [PATCH 15/16] C # usage --- .../webdocs/markdowndocs/connector-ch.md | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/documentation/webdocs/markdowndocs/connector-ch.md b/documentation/webdocs/markdowndocs/connector-ch.md index 85596f2053..e96f6f21d2 100644 --- a/documentation/webdocs/markdowndocs/connector-ch.md +++ b/documentation/webdocs/markdowndocs/connector-ch.md @@ -502,7 +502,7 @@ HTTP请求URL采用`sqlutc`时,返回结果集的时间戳将采用UTC时间 #### 安装TDengine -Go的链接器使用了到了 libtaos.so 和taos.h,因此,在使用Go连接器之前,需要在程序运行的机器上安装TDengine以获得相关的驱动文件。 +Go的连接器使用到了 libtaos.so 和taos.h,因此,在使用Go连接器之前,需要在程序运行的机器上安装TDengine以获得相关的驱动文件。 #### Go语言引入package TDengine提供了GO驱动程序“taosSql”包。taosSql驱动包是基于GO的“database/sql/driver”接口的实现。用户可以通过`go get`命令来获取驱动包。 @@ -561,3 +561,37 @@ taosSql驱动包内采用cgo模式,调用了TDengine的C/C++同步接口,与 3. 创建表、写入和查询数据 在创建好了数据库后,就可以开始创建表和写入查询数据了。这些操作的基本思路都是首先组装SQL语句,然后调用db.Exec执行,并检查错误信息和执行相应的处理。可以参考上面的样例代码 + +## CSharp Connector + +在Windows系统上,C#应用程序可以使用TDengine的原生C接口来执行所有数据库操作,后续版本将提供ORM(dapper)框架驱动。 + +#### 安装TDengine客户端 + +C#连接器需要使用`libtaos.so`和`taos.h`。因此,在使用C#连接器之前,需在程序运行的Windows环境安装TDengine的Windows客户端,以便获得相关驱动文件。 + +安装完成后,在文件夹`C:/TDengine/examples/C#`中,将会看到两个文件 + +- TDengineDriver.cs 调用taos.dll文件的Native C方法 +- TDengineTest.cs 参考程序示例 + +在文件夹`C:\Windows\System32`,将会看到`taos.dll`文件 + +#### 使用方法 + +- 将C#接口文件TDengineDriver.cs加入到应用程序所在.NET项目中 +- 参考TDengineTest.cs来定义数据库连接参数,及执行数据插入、查询等操作的方法 +- 因为C#接口需要用到`taos.dll`文件,用户可以将`taos.dll`文件加入.NET解决方案中 + +#### 注意事项 + +- `taos.dll`文件使用x64平台编译,所以.NET项目在生成.exe文件时,“解决方案”/“项目”的“平台”请均选择“x64”。 +- 此.NET接口目前已经在Visual Studio 2013/2015/2017中验证过,其它VS版本尚待验证。 + +#### 第三方驱动 + +Maikebing.Data.Taos是一个基于TDengine的RESTful Connector构建的ADO.Net提供器,该开发包由热心贡献者`麦壳饼@@maikebing`提供,具体请参考 + +``` +https://gitee.com/maikebing/Maikebing.EntityFrameworkCore.Taos +``` From ee33d78494db839cd5c457af5f7926158feabedc Mon Sep 17 00:00:00 2001 From: slguan Date: Tue, 3 Dec 2019 19:00:24 +0800 Subject: [PATCH 16/16] windows client user manual --- .../webdocs/markdowndocs/connector-ch.md | 82 ++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/documentation/webdocs/markdowndocs/connector-ch.md b/documentation/webdocs/markdowndocs/connector-ch.md index e96f6f21d2..ce996f4876 100644 --- a/documentation/webdocs/markdowndocs/connector-ch.md +++ b/documentation/webdocs/markdowndocs/connector-ch.md @@ -310,7 +310,7 @@ import taos http://:/rest/sql ``` -​参数说明: +参数说明: - IP: 集群中的任一台主机 - PORT: 配置文件中httpPort配置项,缺省为6020 @@ -595,3 +595,83 @@ Maikebing.Data.Taos是一个基于TDengine的RESTful Connector构建的ADO.Net ``` https://gitee.com/maikebing/Maikebing.EntityFrameworkCore.Taos ``` + +## Windows客户端及程序接口 + +### 客户端安装 + +在Windows操作系统下,TDengine提供64位的Windows客户端,客户端安装程序为.exe文件,运行该文件即可安装,安装路径为C:\TDengine。Windows的客户端可运行在主流的64位Windows平台之上,客户端目录结构如下: + +``` +├── cfg +│   └── taos.cfg +├── connector +│   ├── go +│   ├── grafana +│   ├── jdbc +│   └── python +├── driver +│   ├── taos.dll +│   ├── taos.exp +│   └── taos.lib +├── examples +│   ├── bash +│   ├── c +│   ├── C# +│   ├── go +│   ├── JDBC +│   ├── lua +│   ├── matlab +│   ├── nodejs +│   ├── python +│   ├── R +│   └── rust +├── include +│   └── taos.h +└── taos.exe +``` + +其中,最常用的文件列出如下: + ++ Client可执行文件: C:/TDengine/taos.exe ++ 配置文件: C:/TDengine/cfg/taos.cfg ++ C驱动程序目录: C:/TDengine/driver ++ C驱动程序头文件: C:/TDengine/include ++ JDBC驱动程序目录: C:/TDengine/connector/jdbc ++ GO驱动程序目录:C:/TDengine/connector/go ++ Python驱动程序目录:C:/TDengine/connector/python ++ C#驱动程序及示例代码: C:/TDengine/examples/C# ++ 日志目录(第一次运行程序时生成):C:/TDengine/log + +### 注意事项 + +#### Shell工具注意事项 + +在开始菜单中搜索cmd程序,通过命令行方式执行taos.exe即可打开TDengine的Client程序,如下所示,其中ServerIP为TDengine所在Linux服务器的IP地址 + +``` +taos -h +``` + +在cmd中对taos的使用与Linux平台没有差别,但需要注意以下几点: + ++ 确保Windows防火墙或者其他杀毒软件处于关闭状态,TDengine的服务端与客户端通信的端口请参考`服务端配置`章节 ++ 确认客户端连接时指定了正确的服务器IP地址 ++ ping服务器IP,如果没有反应,请检查你的网络 + +#### C++接口注意事项 + +TDengine在Window系统上提供的API与Linux系统是相同的, 应用程序使用时,需要包含TDengine头文件taos.h,连接时需要链接TDengine库taos.lib,运行时将taos.dll放到可执行文件目录下。 + +#### JDBC接口注意事项 + +在Windows系统上,应用程序可以使用JDBC接口来操纵数据库,使用JDBC接口的注意事项如下: + ++ 将JDBC驱动程序(JDBCDriver-1.0.0-dist.jar)放置到当前的CLASS_PATH中; + ++ 将Windows开发包(taos.dll)放置到system32目录下。 + + + + +