Merge branch 'develop' into add-test-execution-in-ci

This commit is contained in:
Shuduo Sang 2020-04-07 16:39:18 +08:00
commit b8c25f148c
58 changed files with 718 additions and 517 deletions

View File

@ -61,6 +61,9 @@ script:
grep failed out.txt grep failed out.txt
total_failed=`grep failed out.txt | wc -l` total_failed=`grep failed out.txt | wc -l`
echo "Total $total_failed failed" echo "Total $total_failed failed"
if [ "$total_failed" -ne "0" ]; then
exit $total_failed
fi
;; ;;
esac esac

View File

@ -3,6 +3,8 @@ PROJECT(TDengine)
SET(TD_CLUSTER FALSE) SET(TD_CLUSTER FALSE)
SET(TD_ACCOUNT FALSE) SET(TD_ACCOUNT FALSE)
SET(TD_VPEER FALSE)
SET(TD_MPEER FALSE)
SET(TD_GRANT FALSE) SET(TD_GRANT FALSE)
SET(TD_COVER FALSE) SET(TD_COVER FALSE)
SET(TD_PAGMODE_LITE FALSE) SET(TD_PAGMODE_LITE FALSE)

View File

@ -107,12 +107,12 @@ IF (TD_LINUX_64)
SET(RELEASE_FLAGS "-O0") SET(RELEASE_FLAGS "-O0")
IF (NOT TD_ARM) IF (NOT TD_ARM)
IF (${CMAKE_CXX_COMPILER_ID} MATCHES "Clang") IF (${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -Wno-missing-braces -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ELSE () ELSE ()
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g3 -gdwarf-2 -malign-stringops -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -malign-double -g3 -gdwarf-2 -malign-stringops -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF () ENDIF ()
ELSE () ELSE ()
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF () ENDIF ()
ADD_DEFINITIONS(-DLINUX) ADD_DEFINITIONS(-DLINUX)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT) ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
@ -128,7 +128,7 @@ IF (TD_LINUX_64)
ENDIF () ENDIF ()
SET(DEBUG_FLAGS "-O0 -DDEBUG") SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0") SET(RELEASE_FLAGS "-O0")
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -fsigned-char -munaligned-access -fpack-struct=8 -latomic -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -munaligned-access -fpack-struct=8 -latomic -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ADD_DEFINITIONS(-DLINUX) ADD_DEFINITIONS(-DLINUX)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT) ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
ADD_DEFINITIONS(-DUSE_LIBICONV) ADD_DEFINITIONS(-DUSE_LIBICONV)
@ -141,7 +141,7 @@ IF (TD_LINUX_64)
ELSEIF (TD_WINDOWS_64) ELSEIF (TD_WINDOWS_64)
SET(CMAKE_GENERATOR "NMake Makefiles" CACHE INTERNAL "" FORCE) SET(CMAKE_GENERATOR "NMake Makefiles" CACHE INTERNAL "" FORCE)
IF (NOT TD_GODLL) IF (NOT TD_GODLL)
SET(COMMON_FLAGS "/nologo /WX- /Oi /Oy- /Gm- /EHsc /MT /GS /Gy /fp:precise /Zc:wchar_t /Zc:forScope /Gd /errorReport:prompt /analyze-") SET(COMMON_FLAGS "/nologo /WX /Oi /Oy- /Gm- /EHsc /MT /GS /Gy /fp:precise /Zc:wchar_t /Zc:forScope /Gd /errorReport:prompt /analyze-")
SET(DEBUG_FLAGS "/Zi /W3 /GL") SET(DEBUG_FLAGS "/Zi /W3 /GL")
SET(RELEASE_FLAGS "/W0 /GL") SET(RELEASE_FLAGS "/W0 /GL")
ENDIF () ENDIF ()
@ -151,7 +151,7 @@ IF (TD_LINUX_64)
ADD_DEFINITIONS(-DPTW32_BUILD) ADD_DEFINITIONS(-DPTW32_BUILD)
ADD_DEFINITIONS(-D_MBCS -D_CRT_SECURE_NO_DEPRECATE -D_CRT_NONSTDC_NO_DEPRECATE) ADD_DEFINITIONS(-D_MBCS -D_CRT_SECURE_NO_DEPRECATE -D_CRT_NONSTDC_NO_DEPRECATE)
ELSEIF (TD_DARWIN_64) ELSEIF (TD_DARWIN_64)
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -Wno-missing-braces -fPIC -g -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(DEBUG_FLAGS "-O0 -DDEBUG") SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0") SET(RELEASE_FLAGS "-O0")
ADD_DEFINITIONS(-DDARWIN) ADD_DEFINITIONS(-DDARWIN)

View File

@ -213,8 +213,6 @@ typedef struct SDataBlockList {
int32_t idx; int32_t idx;
uint32_t nSize; uint32_t nSize;
uint32_t nAlloc; uint32_t nAlloc;
char * userParam; /* user assigned parameters for async query */
void * udfp; /* user defined function pointer, used in async model */
STableDataBlocks **pData; STableDataBlocks **pData;
} SDataBlockList; } SDataBlockList;
@ -451,7 +449,6 @@ void tscCloseTscObj(STscObj *pObj);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen); void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen);
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql); void tscKillMetricQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);

View File

@ -941,6 +941,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
sql = sToken.z; sql = sToken.z;
} }
code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMeta(pSql, pTableMetaInfo);
if (pSql->asyncTblPos == NULL) {
assert(code == TSDB_CODE_ACTION_IN_PROGRESS);
}
} }
int32_t len = cend - cstart + 1; int32_t len = cend - cstart + 1;
@ -1064,8 +1068,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) { if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
/* /*
* For async insert, after get the metermeta from server, the sql string will not be * For async insert, after get the table meta from server, the sql string will not be
* parsed using the new metermeta to avoid the overhead cause by get metermeta data information. * parsed using the new table meta to avoid the overhead cause by get table meta data information.
* And during the getMeterMetaCallback function, the sql string will be parsed from the * And during the getMeterMetaCallback function, the sql string will be parsed from the
* interrupted position. * interrupted position.
*/ */

View File

@ -5280,8 +5280,8 @@ int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) { int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
char msg[512] = {0}; char msg[512] = {0};
if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 1)) { if (pCreate->commitLog != -1 && (pCreate->commitLog < 0 || pCreate->commitLog > 2)) {
snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog); snprintf(msg, tListLen(msg), "invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
} }

View File

@ -210,7 +210,7 @@ char* tsGetTagsValue(STableMeta* pTableMeta) {
} }
// todo refactor // todo refactor
static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) { __attribute__ ((unused))static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) {
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
while (*input != 0 && *input++ != delim) { while (*input != 0 && *input++ != delim) {
}; };
@ -218,7 +218,7 @@ static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) {
return input; return input;
} }
static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) { __attribute__ ((unused)) static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) {
size_t len = 0; size_t len = 0;
while (*src != delimiter && *src != 0) { while (*src != delimiter && *src != 0) {
*dst++ = *src++; *dst++ = *src++;

View File

@ -1835,7 +1835,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
} }
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
pMetaMsg->vpeerDesc[i].vnode = htonl(pMetaMsg->vpeerDesc[i].vnode); pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId);
} }
SSchema* pSchema = pMetaMsg->schema; SSchema* pSchema = pMetaMsg->schema;
@ -2399,8 +2399,8 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name); pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
if (pTableMetaInfo->pTableMeta != NULL) { if (pTableMetaInfo->pTableMeta != NULL) {
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
tscTrace("%p retrieve tableMeta from cache, the number of columns:%d, numOfTags:%d", pSql, tinfo.numOfColumns, tscTrace("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
tinfo.numOfTags); tinfo.numOfTags, pTableMetaInfo->pTableMeta);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -108,7 +108,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
if (pSql == NULL) { if (pSql == NULL) {
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("failed to allocate SSqlObj for subscription"); tscError("failed to allocate SSqlObj for subscription");
goto failed; goto _pSql_failed;
} }
pSql->signature = pSql; pSql->signature = pSql;
@ -137,13 +137,11 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
return pSub; return pSub;
failed: failed:
if (sqlstr != NULL) { tfree(sqlstr);
free(sqlstr);
} _pSql_failed:
if (pSql != NULL) { tfree(pSql);
free(pSql); tfree(pSub);
}
free(pSub);
return NULL; return NULL;
} }

View File

@ -1454,7 +1454,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql); SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
if (pNew == NULL) { if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d", tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex); trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vgId : -1, trsupport->subqueryIndex);
pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY; pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
@ -1470,7 +1470,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort
if (vnodeInfo != NULL) { if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId,
trsupport->subqueryIndex, pState->code); trsupport->subqueryIndex, pState->code);
} else { } else {
tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql, tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql,
@ -1481,7 +1481,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
} else { // success, proceed to retrieve data from dnode } else { // success, proceed to retrieve data from dnode
if (vnodeInfo != NULL) { if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId,
trsupport->subqueryIndex); trsupport->subqueryIndex);
} else { } else {
tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,

View File

@ -110,7 +110,8 @@ typedef struct SDataCol {
int bytes; int bytes;
int len; int len;
int offset; int offset;
void * pData; void * pData; // Original data
void * pCData; // Compressed data
} SDataCol; } SDataCol;
typedef struct { typedef struct {

View File

@ -317,14 +317,17 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
pCols->numOfCols = schemaNCols(pSchema); pCols->numOfCols = schemaNCols(pSchema);
pCols->cols[0].pData = pCols->buf; pCols->cols[0].pData = pCols->buf;
int offset = TD_DATA_ROW_HEAD_SIZE;
for (int i = 0; i < schemaNCols(pSchema); i++) { for (int i = 0; i < schemaNCols(pSchema); i++) {
if (i > 0) { if (i > 0) {
pCols->cols[i].pData = (char *)(pCols->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * pCols->maxPoints; pCols->cols[i].pData = (char *)(pCols->cols[i - 1].pData) + schemaColAt(pSchema, i - 1)->bytes * pCols->maxPoints;
} }
pCols->cols[i].type = colType(schemaColAt(pSchema, i)); pCols->cols[i].type = colType(schemaColAt(pSchema, i));
pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i)); pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i));
pCols->cols[i].offset = colOffset(schemaColAt(pSchema, i)); pCols->cols[i].offset = offset;
pCols->cols[i].colId = colColId(schemaColAt(pSchema, i)); pCols->cols[i].colId = colColId(schemaColAt(pSchema, i));
offset += TYPE_BYTES[pCols->cols[i].type];
} }
} }
@ -343,7 +346,6 @@ void tdResetDataCols(SDataCols *pCols) {
} }
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
TSKEY key = dataRowKey(row);
for (int i = 0; i < pCols->numOfCols; i++) { for (int i = 0; i < pCols->numOfCols; i++) {
SDataCol *pCol = pCols->cols + i; SDataCol *pCol = pCols->cols + i;
memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, pCol->offset), pCol->bytes); memcpy((void *)((char *)(pCol->pData) + pCol->len), dataRowAt(row, pCol->offset), pCol->bytes);
@ -379,3 +381,7 @@ static int tdFLenFromSchema(STSchema *pSchema) {
return ret; return ret;
} }
int tdMergeDataCols(SDataCols *target, SDataCols *source) {
return 0;
}

View File

@ -6,7 +6,7 @@
#include "ttokendef.h" #include "ttokendef.h"
// todo refactor // todo refactor
static FORCE_INLINE const char* skipSegments(const char* input, char delim, int32_t num) { __attribute__((unused)) static FORCE_INLINE const char* skipSegments(const char* input, char delim, int32_t num) {
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
while (*input != 0 && *input++ != delim) { while (*input != 0 && *input++ != delim) {
}; };
@ -14,7 +14,7 @@ static FORCE_INLINE const char* skipSegments(const char* input, char delim, int3
return input; return input;
} }
static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) { __attribute__((unused)) static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) {
size_t len = 0; size_t len = 0;
while (*src != delimiter && *src != 0) { while (*src != delimiter && *src != 0) {
*dst++ = *src++; *dst++ = *src++;

View File

@ -6,6 +6,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/tsdb/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/tsdb/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
@ -26,6 +27,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
TARGET_LINK_LIBRARIES(taosd cluster) TARGET_LINK_LIBRARIES(taosd cluster)
ENDIF () ENDIF ()
IF (TD_VPEER)
TARGET_LINK_LIBRARIES(taosd balance)
ENDIF ()
SET(PREPARE_ENV_CMD "prepare_env_cmd") SET(PREPARE_ENV_CMD "prepare_env_cmd")
SET(PREPARE_ENV_TARGET "prepare_env_target") SET(PREPARE_ENV_TARGET "prepare_env_target")
ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD} ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}

View File

@ -28,9 +28,11 @@
#include "dnodeRead.h" #include "dnodeRead.h"
#include "dnodeShell.h" #include "dnodeShell.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
#include "mgmtGrant.h"
static int32_t dnodeInitSystem(); static int32_t dnodeInitSystem();
static int32_t dnodeInitStorage(); static int32_t dnodeInitStorage();
extern void grantParseParameter();
static void dnodeCleanupStorage(); static void dnodeCleanupStorage();
static void dnodeCleanUpSystem(); static void dnodeCleanUpSystem();
static void dnodeSetRunStatus(SDnodeRunStatus status); static void dnodeSetRunStatus(SDnodeRunStatus status);
@ -77,7 +79,7 @@ int32_t main(int32_t argc, char *argv[]) {
} }
/* Set termination handler. */ /* Set termination handler. */
struct sigaction act; struct sigaction act = {0};
act.sa_flags = SA_SIGINFO; act.sa_flags = SA_SIGINFO;
act.sa_sigaction = signal_handler; act.sa_sigaction = signal_handler;
sigaction(SIGTERM, &act, NULL); sigaction(SIGTERM, &act, NULL);

View File

@ -104,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
static int dnodeGetVnodeList(int32_t vnodeList[]) { static int32_t dnodeGetVnodeList(int32_t vnodeList[]) {
DIR *dir = opendir(tsVnodeDir); DIR *dir = opendir(tsVnodeDir);
if (dir == NULL) { if (dir == NULL) {
return TSDB_CODE_NO_WRITE_ACCESS; return TSDB_CODE_NO_WRITE_ACCESS;
@ -129,46 +129,59 @@ static int dnodeGetVnodeList(int32_t vnodeList[]) {
} }
static int32_t dnodeOpenVnodes() { static int32_t dnodeOpenVnodes() {
char vnodeDir[TSDB_FILENAME_LEN * 3]; char vnodeDir[TSDB_FILENAME_LEN * 3];
int failed = 0; int32_t failed = 0;
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000); int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * 10000);
int numOfVnodes = dnodeGetVnodeList(vnodeList); int32_t numOfVnodes = dnodeGetVnodeList(vnodeList);
for (int i=0; i<numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]); snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]);
if (vnodeOpen(vnodeList[i], vnodeDir) <0) failed++; if (vnodeOpen(vnodeList[i], vnodeDir) < 0) failed++;
} }
free(vnodeList); free(vnodeList);
dPrint("there are total vnodes:%d, failed to open:%d", numOfVnodes, failed); dPrint("there are total vnodes:%d, failed to open:%d", numOfVnodes, failed);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void dnodeCloseVnodes() { static void dnodeCloseVnodes() {
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000); int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * 10000);
int numOfVnodes = dnodeGetVnodeList(vnodeList); int32_t numOfVnodes = dnodeGetVnodeList(vnodeList);
for (int i=0; i<numOfVnodes; ++i) for (int32_t i = 0; i < numOfVnodes; ++i) {
vnodeClose(vnodeList[i]); vnodeClose(vnodeList[i]);
}
free(vnodeList); free(vnodeList);
dPrint("total vnodes:%d are all closed", numOfVnodes); dPrint("total vnodes:%d are all closed", numOfVnodes);
} }
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1);
pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2);
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
pCreate->cfg.rowsInFileBlock = htonl(pCreate->cfg.rowsInFileBlock);
pCreate->cfg.blocksPerTable = htons(pCreate->cfg.blocksPerTable);
pCreate->cfg.cacheNumOfBlocks.totalBlocks = htonl(pCreate->cfg.cacheNumOfBlocks.totalBlocks);
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
pCreate->vpeerDesc[j].vgId = htonl(pCreate->vpeerDesc[j].vgId);
pCreate->vpeerDesc[j].dnodeId = htonl(pCreate->vpeerDesc[j].dnodeId);
pCreate->vpeerDesc[j].ip = htonl(pCreate->vpeerDesc[j].ip);
}
return vnodeCreate(pCreate); return vnodeCreate(pCreate);
} }
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SMDDropVnodeMsg *pDrop = rpcMsg->pCont; SMDDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId); pDrop->vgId = htonl(pDrop->vgId);
@ -176,7 +189,6 @@ static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
} }
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
@ -205,7 +217,7 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
static void dnodeSendStatusMsg(void *handle, void *tmrId) { static void dnodeSendStatusMsg(void *handle, void *tmrId) {
if (tsDnodeTmr == NULL) { if (tsDnodeTmr == NULL) {
dError("dnode timer is already released"); dError("dnode timer is already released");
return; return;
} }

View File

@ -15,38 +15,25 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "twal.h" #include "twal.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#include "dnodeRead.h" #include "dnodeRead.h"
#include "queryExecutor.h"
#include "vnode.h" #include "vnode.h"
typedef struct { typedef struct {
int32_t code; SRspRet rspRet;
int32_t count; void *pCont;
int32_t numOfVnodes; int32_t contLen;
} SRpcContext; SRpcMsg rpcMsg;
typedef struct {
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
SRpcContext *pRpcContext; // RPC message context
} SReadMsg; } SReadMsg;
static void *dnodeProcessReadQueue(void *param); static void *dnodeProcessReadQueue(void *param);
static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead);
static void dnodeHandleIdleReadWorker(); static void dnodeHandleIdleReadWorker();
static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg);
static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg);
static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(void *pVnode, SReadMsg *pNode);
// module global variable // module global variable
static taos_qset readQset; static taos_qset readQset;
@ -55,14 +42,11 @@ static int32_t maxThreads;
static int32_t minThreads; static int32_t minThreads;
int32_t dnodeInitRead() { int32_t dnodeInitRead() {
dnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessQueryMsg;
dnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeProcessRetrieveMsg;
readQset = taosOpenQset(); readQset = taosOpenQset();
minThreads = 3; minThreads = 3;
maxThreads = tsNumOfCores*tsNumOfThreadsPerCore; maxThreads = tsNumOfCores * tsNumOfThreadsPerCore;
if (maxThreads <= minThreads*2) maxThreads = 2*minThreads; if (maxThreads <= minThreads * 2) maxThreads = 2 * minThreads;
dPrint("dnode read is opened"); dPrint("dnode read is opened");
return 0; return 0;
@ -77,21 +61,21 @@ void dnodeRead(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0; int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen; int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont; char *pCont = (char *) pMsg->pCont;
SRpcContext *pRpcContext = NULL;
void *pVnode; void *pVnode;
dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle); dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle);
if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
queuedMsgNum = 0;
}
while (leftLen > 0) { while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont; SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
pVnode = vnodeGetVnode(pHead->vgId); if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
pVnode = vnodeGetVnode(pHead->vgId);
} else {
pVnode = vnodeAccquireVnode(pHead->vgId);
}
if (pVnode == NULL) { if (pVnode == NULL) {
leftLen -= pHead->contLen; leftLen -= pHead->contLen;
pCont -= pHead->contLen; pCont -= pHead->contLen;
@ -104,7 +88,6 @@ void dnodeRead(SRpcMsg *pMsg) {
pRead->rpcMsg = *pMsg; pRead->rpcMsg = *pMsg;
pRead->pCont = pCont; pRead->pCont = pCont;
pRead->contLen = pHead->contLen; pRead->contLen = pHead->contLen;
pRead->pRpcContext = pRpcContext;
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
@ -155,6 +138,34 @@ void dnodeFreeRqueue(void *rqueue) {
// dynamically adjust the number of threads // dynamically adjust the number of threads
} }
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
taos_queue queue = vnodeGetRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
}
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_ACTION_NEED_REPROCESSED) {
dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead);
}
SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp,
.contLen = pRead->rspRet.len,
.code = pRead->rspRet.code,
};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pRead->rpcMsg.pCont);
}
static void *dnodeProcessReadQueue(void *param) { static void *dnodeProcessReadQueue(void *param) {
taos_qset qset = (taos_qset)param; taos_qset qset = (taos_qset)param;
SReadMsg *pReadMsg; SReadMsg *pReadMsg;
@ -167,13 +178,8 @@ static void *dnodeProcessReadQueue(void *param) {
continue; continue;
} }
terrno = 0; int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet);
if (dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) { dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
(*dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) (pVnode, pReadMsg);
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
}
taosFreeQitem(pReadMsg); taosFreeQitem(pReadMsg);
} }
@ -192,118 +198,3 @@ static void dnodeHandleIdleReadWorker() {
} }
} }
static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
SRpcContext *pRpcContext = pRead->pRpcContext;
int32_t code = 0;
if (pRpcContext) {
if (terrno) {
if (pRpcContext->code == 0) pRpcContext->code = terrno;
}
int32_t count = atomic_add_fetch_32(&pRpcContext->count, 1);
if (count < pRpcContext->numOfVnodes) {
// not over yet, multiple vnodes
return;
}
// over, result can be merged now
code = pRpcContext->code;
} else {
code = terrno;
}
//TODO: query handle is returned by dnodeProcessQueryMsg
if (0) {
SRpcMsg rsp;
rsp.handle = pRead->rpcMsg.handle;
rsp.code = code;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
}
rpcFreeCont(pRead->rpcMsg.pCont); // free the received message
}
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->pRpcContext = pMsg->pRpcContext;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
taos_queue queue = vnodeGetRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
}
static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
SQInfo* pQInfo = NULL;
if (pMsg->contLen != 0) {
void* tsdb = vnodeGetTsdb(pVnode);
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = sizeof(SQueryTableRsp),
.code = code,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
dTrace("dnode query msg disposed, thandle:%p", pMsg->rpcMsg.handle);
vnodeRelease(pVnode);
} else {
pQInfo = pMsg->pCont;
}
qTableQuery(pQInfo); // do execute query
}
static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
int32_t contLen = 0;
SRetrieveTableRsp *pRsp = NULL;
int32_t code = qRetrieveQueryResultInfo(pQInfo);
if (code != TSDB_CODE_SUCCESS) {
contLen = sizeof(SRetrieveTableRsp);
pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
} else {
// todo check code and handle error in build result set
code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
if (qHasMoreResultsToRetrieve(pQInfo)) {
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
} else { // no further execution invoked, release the ref to vnode
dnodeProcessReadResult(pVnode, pMsg);
//vnodeRelease(pVnode);
}
}
SRpcMsg rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle);
vnodeRelease(pVnode);
}

View File

@ -37,7 +37,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead;
int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
numOfThreads = 1; numOfThreads = 1;

View File

@ -52,7 +52,6 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
SWriteWorkerPool wWorkerPool; SWriteWorkerPool wWorkerPool;
int32_t dnodeInitWrite() { int32_t dnodeInitWrite() {
wWorkerPool.max = tsNumOfCores; wWorkerPool.max = tsNumOfCores;
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
if (wWorkerPool.writeWorker == NULL) return -1; if (wWorkerPool.writeWorker == NULL) return -1;
@ -71,7 +70,7 @@ void dnodeCleanupWrite() {
} }
void dnodeWrite(SRpcMsg *pMsg) { void dnodeWrite(SRpcMsg *pMsg) {
char *pCont = (char *) pMsg->pCont; char *pCont = (char *)pMsg->pCont;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) { if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) {
SMsgDesc *pDesc = (SMsgDesc *)pCont; SMsgDesc *pDesc = (SMsgDesc *)pCont;
@ -80,16 +79,16 @@ void dnodeWrite(SRpcMsg *pMsg) {
} }
SMsgHead *pHead = (SMsgHead *) pCont; SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
taos_queue queue = vnodeGetWqueue(pHead->vgId); taos_queue queue = vnodeGetWqueue(pHead->vgId);
if (queue) { if (queue) {
// put message into queue // put message into queue
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
pWrite->rpcMsg = *pMsg; pWrite->rpcMsg = *pMsg;
pWrite->pCont = pCont; pWrite->pCont = pCont;
pWrite->contLen = pHead->contLen; pWrite->contLen = pHead->contLen;
taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite); taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
} else { } else {
@ -227,4 +226,3 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
pthread_exit(NULL); pthread_exit(NULL);
} }
} }

View File

@ -98,7 +98,6 @@ typedef struct {
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t vnode;
uint32_t privateIp; uint32_t privateIp;
uint32_t publicIp; uint32_t publicIp;
} SVnodeGid; } SVnodeGid;

View File

@ -47,6 +47,7 @@ static STaosError errors[] = {
// rpc // rpc
TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, 0, 1, "action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, 0, 1, "action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_NEED_REPROCESSED, 0, 3, "action need to be reprocessed")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, 0, 4, "message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, 0, 4, "message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_ALREADY_PROCESSED, 0, 5, "message already processed") TAOS_DEFINE_ERROR(TSDB_CODE_ALREADY_PROCESSED, 0, 5, "message already processed")
TAOS_DEFINE_ERROR(TSDB_CODE_REDIRECT, 0, 6, "redirect") TAOS_DEFINE_ERROR(TSDB_CODE_REDIRECT, 0, 6, "redirect")
@ -156,28 +157,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CANCELLED, 0, 110, "query cancelled
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 111, "invalid ie") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 111, "invalid ie")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 112, "invalid value") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 112, "invalid value")
// TAOS_DEFINE_ERROR(TSDB_CODE_SYNC_REQUIRED, 0, 99, "sync required") // others
// TAOS_DEFINE_ERROR(TSDB_CODE_UNSYNCED, 0, 100, "unsyned") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 120, "invalid file format")
// TAOS_DEFINE_ERROR(TSDB_CODE_DATA_ALREADY_IMPORTED, 0, 75, "data already imported")
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_COMMIT_LOG, 0, 109, "invalid commit log") // commit log init failed
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_STATUS, 0, 116, "invalid vnode status")
// TAOS_DEFINE_ERROR(TSDB_CODE_TIMESTAMP_OUT_OF_RANGE, 0, 105, "timestamp out of range")
// TAOS_DEFINE_ERROR(TSDB_CODE_DUPLICATE_TAGS, 0, 112, "duplicate tags") // tags value for join not unique
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SUBMIT_MSG, 0, 113, "invalid submit message")
// TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock resources")
// TAOS_DEFINE_ERROR(TSDB_CODE_FILE_BLOCK_TS_DISORDERED, 0, 108, "file block ts disordered") // time stamp in file block is disordered
// TAOS_DEFINE_ERROR(TSDB_CODE_BATCH_SIZE_TOO_BIG, 0, 104, "batch size too big")
// TAOS_DEFINE_ERROR(TSDB_CODE_WRONG_SCHEMA, 0, 53, "wrong schema")
// TAOS_DEFINE_ERROR(TSDB_CODE_NO_QSUMMARY, 0, 68, "no qsummery")
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_METER_ID, 0, 27, "invalid meter id")
// TAOS_DEFINE_ERROR(TSDB_CODE_METRICMETA_EXPIRED, 0, 63, "metricmeta expired") // local cached metric-meta expired causes error in metric query
// TAOS_DEFINE_ERROR(TSDB_CODE_SESSION_ALREADY_EXIST, 0, 67, "session already exist")
// TAOS_DEFINE_ERROR(TSDB_CODE_SESSION_NOT_READY, 0, 103, "session not ready") // table NOT in ready state
// TAOS_DEFINE_ERROR(TSDB_CODE_DATA_OVERFLOW, 0, 82, "data overflow")
// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_TRANS_NOT_FINISHED, 0, 17, "action transaction not finished")
// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_NOT_ONLINE, 0, 18, "action not online")
// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_SEND_FAILD, 0, 19, "action send failed")
// TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_SESSION, 0, 20, "not active session")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
}; };

View File

@ -241,7 +241,8 @@ typedef struct SSchema {
} SSchema; } SSchema;
typedef struct { typedef struct {
int32_t vnode; // the index of vnode int32_t vgId;
int32_t dnodeId;
uint32_t ip; uint32_t ip;
} SVnodeDesc; } SVnodeDesc;
@ -752,12 +753,12 @@ typedef struct {
typedef struct { typedef struct {
int32_t numOfQueries; int32_t numOfQueries;
SQueryDesc qdesc[]; SQueryDesc *qdesc;
} SQqueryList; } SQqueryList;
typedef struct { typedef struct {
int32_t numOfStreams; int32_t numOfStreams;
SStreamDesc sdesc[]; SStreamDesc *sdesc;
} SStreamList; } SStreamList;
typedef struct { typedef struct {

View File

@ -22,7 +22,9 @@ extern "C" {
typedef struct { typedef struct {
int len; int len;
int code;
void *rsp; void *rsp;
void *qhandle; //used by query and retrieve msg
} SRspRet; } SRspRet;
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
@ -31,7 +33,8 @@ int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeClose(int32_t vgId); int32_t vnodeClose(int32_t vgId);
void vnodeRelease(void *pVnode); void vnodeRelease(void *pVnode);
void* vnodeGetVnode(int32_t vgId); void* vnodeAccquireVnode(int32_t vgId); // add refcount
void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged
void* vnodeGetRqueue(void *); void* vnodeGetRqueue(void *);
void* vnodeGetWqueue(int32_t vgId); void* vnodeGetWqueue(int32_t vgId);
@ -41,6 +44,8 @@ void* vnodeGetTsdb(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, SWalHead *pHead, void *item); int32_t vnodeProcessWrite(void *pVnode, int qtype, SWalHead *pHead, void *item);
void vnodeBuildStatusMsg(void * param); void vnodeBuildStatusMsg(void * param);
int32_t vnodeProcessRead(void *pVnode, int msgType, void *pCont, int32_t contLen, SRspRet *ret);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -23,7 +23,7 @@ extern "C" {
int32_t mgmtInitBalance(); int32_t mgmtInitBalance();
void mgmtCleanupBalance(); void mgmtCleanupBalance();
void mgmtStartBalanceTimer(int32_t afterMs) ; void mgmtBalanceNotify() ;
int32_t mgmtAllocVnodes(SVgObj *pVgroup); int32_t mgmtAllocVnodes(SVgObj *pVgroup);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -14,7 +14,7 @@
*/ */
#ifndef TDENGINE_MGMT_GRANT_H #ifndef TDENGINE_MGMT_GRANT_H
#define TDENGINE_MGMT_GTANT_H #define TDENGINE_MGMT_GRANT_H
#ifdef __cplusplus #ifdef __cplusplus
"C" { "C" {

View File

@ -18,11 +18,35 @@
#include "mgmtBalance.h" #include "mgmtBalance.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
int32_t mgmtInitBalance() { return 0; } extern int32_t balanceInit();
void mgmtCleanupBalance() {} extern void balanceCleanUp();
void mgmtStartBalanceTimer(int32_t afterMs) {} extern void balanceNotify();
extern int32_t balanceAllocVnodes(SVgObj *pVgroup);
int32_t mgmtInitBalance() {
#ifdef _VPEER
return balanceInit();
#else
return 0;
#endif
}
void mgmtCleanupBalance() {
#ifdef _VPEER
balanceCleanUp();
#endif
}
void mgmtBalanceNotify() {
#ifdef _VPEER
balanceNotify();
#endif
}
int32_t mgmtAllocVnodes(SVgObj *pVgroup) { int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
#ifdef _VPEER
return balanceAllocVnodes(pVgroup);
#else
void * pNode = NULL; void * pNode = NULL;
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
SDnodeObj *pSelDnode = NULL; SDnodeObj *pSelDnode = NULL;
@ -53,4 +77,5 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
mTrace("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes); mTrace("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
#endif
} }

View File

@ -166,8 +166,8 @@ SDbObj *mgmtGetDbByTableId(char *tableId) {
} }
static int32_t mgmtCheckDBParams(SCMCreateDbMsg *pCreate) { static int32_t mgmtCheckDBParams(SCMCreateDbMsg *pCreate) {
if (pCreate->commitLog < 0 || pCreate->commitLog > 1) { if (pCreate->commitLog < 0 || pCreate->commitLog > 2) {
mError("invalid db option commitLog: %d, only 0 or 1 allowed", pCreate->commitLog); mError("invalid db option commitLog: %d, only 0-2 allowed", pCreate->commitLog);
return TSDB_CODE_INVALID_OPTION; return TSDB_CODE_INVALID_OPTION;
} }

View File

@ -41,7 +41,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
extern int32_t clusterInit(); extern int32_t clusterInit();
extern void clusterCleanUp(); extern void clusterCleanUp();
extern int32_t clusterGetDnodesNum(); extern int32_t clusterGetDnodesNum();
extern void * clusterGetNextDnode(void *pNode, void **pDnode); extern void * clusterGetNextDnode(void *pNode, SDnodeObj **pDnode);
extern void clusterIncDnodeRef(SDnodeObj *pDnode); extern void clusterIncDnodeRef(SDnodeObj *pDnode);
extern void clusterDecDnodeRef(SDnodeObj *pDnode); extern void clusterDecDnodeRef(SDnodeObj *pDnode);
extern SDnodeObj* clusterGetDnode(int32_t dnodeId); extern SDnodeObj* clusterGetDnode(int32_t dnodeId);
@ -251,7 +251,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
if (pDnode->status != TSDB_DN_STATUS_READY) { if (pDnode->status != TSDB_DN_STATUS_READY) {
mTrace("dnode:%d, from offline to online", pDnode->dnodeId); mTrace("dnode:%d, from offline to online", pDnode->dnodeId);
pDnode->status = TSDB_DN_STATUS_READY; pDnode->status = TSDB_DN_STATUS_READY;
mgmtStartBalanceTimer(200); mgmtBalanceNotify();
} }
mgmtDecDnodeRef(pDnode); mgmtDecDnodeRef(pDnode);

View File

@ -1512,7 +1512,8 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
} else { } else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp; pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp;
} }
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId);
} }
pMeta->numOfVpeers = pVgroup->numOfVnodes; pMeta->numOfVpeers = pVgroup->numOfVnodes;

View File

@ -32,7 +32,7 @@
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
void *tsVgroupSdb = NULL; void *tsVgroupSdb = NULL;
static int32_t tsVgUpdateSize = 0; int32_t tsVgUpdateSize = 0;
static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
@ -93,11 +93,12 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId); SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
pVgroup->vnodeGid[i].privateIp = pDnode->privateIp; if (pDnode != NULL) {
pVgroup->vnodeGid[i].publicIp = pDnode->publicIp; pVgroup->vnodeGid[i].privateIp = pDnode->privateIp;
pVgroup->vnodeGid[i].vnode = pVgroup->vgId; pVgroup->vnodeGid[i].publicIp = pDnode->publicIp;
atomic_add_fetch_32(&pDnode->openVnodes, 1); atomic_add_fetch_32(&pDnode->openVnodes, 1);
mgmtDecDnodeRef(pDnode); mgmtDecDnodeRef(pDnode);
}
} }
mgmtAddVgroupIntoDb(pVgroup); mgmtAddVgroupIntoDb(pVgroup);
@ -236,7 +237,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) {
mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
mPrint("vgroup:%d, index:%d, dnode:%d vnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].vnode); mPrint("vgroup:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId);
} }
pMsg->ahandle = pVgroup; pMsg->ahandle = pVgroup;
@ -312,27 +313,21 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
} }
for (int32_t i = 0; i < maxReplica; ++i) { for (int32_t i = 0; i < maxReplica; ++i) {
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "dnode");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16; pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip"); strcpy(pSchema[cols].name, "ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "vnode");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 9; pShow->bytes[cols] = 9;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "vnode status"); strcpy(pSchema[cols].name, "vstatus");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "public ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
} }
@ -416,15 +411,15 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
cols++; cols++;
for (int32_t i = 0; i < maxReplica; ++i) { for (int32_t i = 0; i < maxReplica; ++i) {
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *) pWrite = pVgroup->vnodeGid[i].dnodeId;
cols++;
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp); tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr); strcpy(pWrite, ipstr);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *) pWrite = pVgroup->vnodeGid[i].vnode;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pVgroup->vnodeGid[i].dnodeId != 0) { if (pVgroup->vnodeGid[i].dnodeId != 0) {
char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i); char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i);
@ -433,11 +428,6 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
strcpy(pWrite, "null"); strcpy(pWrite, "null");
} }
cols++; cols++;
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].publicIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
} }
numOfRows++; numOfRows++;
@ -490,14 +480,15 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2); pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2);
pCfg->daysToKeep = htonl(pCfg->daysToKeep); pCfg->daysToKeep = htonl(pCfg->daysToKeep);
pCfg->commitTime = htonl(pCfg->commitTime); pCfg->commitTime = htonl(pCfg->commitTime);
pCfg->blocksPerTable = htons(pCfg->blocksPerTable);
pCfg->replications = (char) pVgroup->numOfVnodes;
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
pCfg->blocksPerTable = htons(pCfg->blocksPerTable);
pCfg->replications = (int8_t) pVgroup->numOfVnodes;
SVnodeDesc *vpeerDesc = pVnode->vpeerDesc; SVnodeDesc *vpeerDesc = pVnode->vpeerDesc;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode); vpeerDesc[j].vgId = htonl(pVgroup->vgId);
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp); vpeerDesc[j].dnodeId = htonl(pVgroup->vnodeGid[j].dnodeId);
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp);
} }
return pVnode; return pVnode;

View File

@ -205,6 +205,8 @@ ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size);
ssize_t twrite(int fd, void *buf, size_t n); ssize_t twrite(int fd, void *buf, size_t n);
ssize_t tread(int fd, void *buf, size_t count);
bool taosCheckPthreadValid(pthread_t thread); bool taosCheckPthreadValid(pthread_t thread);
void taosResetPthread(pthread_t *thread); void taosResetPthread(pthread_t *thread);

View File

@ -291,6 +291,30 @@ int taosInitTimer(void (*callback)(int), int ms) {
return 0; return 0;
} }
ssize_t tread(int fd, void *buf, size_t count) {
size_t leftbytes = count;
ssize_t readbytes;
char * tbuf = (char *)buf;
while (leftbytes > 0) {
readbytes = read(fd, (void *)tbuf, leftbytes);
if (readbytes < 0) {
if (errno == EINTR) {
continue;
} else {
return -1;
}
} else if (readbytes == 0) {
return (ssize_t)(count - leftbytes);
}
leftbytes -= readbytes;
tbuf += readbytes;
}
return (ssize_t)count;
}
ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) { ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) {
size_t leftbytes = size; size_t leftbytes = size;
ssize_t sentbytes; ssize_t sentbytes;
@ -308,6 +332,8 @@ ssize_t tsendfile(int dfd, int sfd, off_t *offset, size_t size) {
else { else {
return -1; return -1;
} }
} else if (sentbytes == 0) {
return (ssize_t)(size - leftbytes);
} }
leftbytes -= sentbytes; leftbytes -= sentbytes;

View File

@ -15,3 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
ENDIF () ENDIF ()
ADD_SUBDIRECTORY(tests) ADD_SUBDIRECTORY(tests)
SET_SOURCE_FILES_PROPERTIES(src/sql.c PROPERTIES COMPILE_FLAGS -w)

View File

@ -198,6 +198,12 @@ typedef struct SQInfo {
*/ */
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
/**
* destroy the query info struct
* @param pQInfo
*/
void qDestroyQueryInfo(SQInfo* pQInfo);
/** /**
* query on single table * query on single table
* @param pReadMsg * @param pReadMsg

View File

@ -38,7 +38,7 @@ typedef struct SLoserTreeInfo {
SLoserTreeNode *pNode; SLoserTreeNode *pNode;
} SLoserTreeInfo; } SLoserTreeInfo;
uint8_t tLoserTreeCreate(SLoserTreeInfo **pTree, int32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); uint32_t tLoserTreeCreate(SLoserTreeInfo **pTree, int32_t numOfEntries, void *param, __merge_compare_fn_t compareFn);
void tLoserTreeInit(SLoserTreeInfo *pTree); void tLoserTreeInit(SLoserTreeInfo *pTree);

View File

@ -1566,6 +1566,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
} }
destroyResultBuf(pRuntimeEnv->pResultBuf); destroyResultBuf(pRuntimeEnv->pResultBuf);
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
} }
@ -2238,27 +2239,6 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi
pQuery->pSelectExpr[columnIndex].resBytes * realRowId; pQuery->pSelectExpr[columnIndex].resBytes * realRowId;
} }
void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
if (pQInfo == NULL) {
return;
}
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
// tSidSetDestroy(&pQInfo->pSidSet);
if (pQInfo->pTableDataInfo != NULL) {
// size_t num = taosHashGetSize(pQInfo->pTableIdList);
for (int32_t j = 0; j < 0; ++j) {
destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
}
}
tfree(pQInfo->pTableDataInfo);
}
int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
@ -2266,14 +2246,10 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
pQuery->window.ekey, pQuery->order.order); pQuery->window.ekey, pQuery->order.order);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
// pQInfo->over = 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pQuery->status = 0; pQuery->status = 0;
pQuery->rec = (SResultRec){0};
pQuery->rec = (SResultRec){0}; pQuery->rec = (SResultRec){0};
changeExecuteScanOrder(pQuery, true); changeExecuteScanOrder(pQuery, true);
@ -4202,8 +4178,9 @@ int32_t doInitializeQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTable
} }
pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols); pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols);
pRuntimeEnv->pQuery = pQuery; taosArrayDestroy(cols);
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTSBuf = param; pRuntimeEnv->pTSBuf = param;
pRuntimeEnv->cur.vnodeIndex = -1; pRuntimeEnv->cur.vnodeIndex = -1;
if (param != NULL) { if (param != NULL) {
@ -5169,7 +5146,8 @@ static void singleTableQueryImpl(SQInfo* pQInfo) {
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed", pQInfo); dTrace("QInfo:%p query is killed", pQInfo);
} else { } else {
dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.size); dTrace("QInfo:%p query task completed, %" PRId64 " rows will returned, total:%" PRId64 " rows", pQInfo, pQuery->rec.size,
pQuery->rec.total);
} }
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
@ -5445,12 +5423,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE); memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE);
} }
dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, " dTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, ts order:%d, "
"timestamp order:%d, tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%" PRId64 "outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64,
", fillType:%d, comptslen:%d, limit:%" PRId64 ", offset:%" PRId64,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey,
pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->orderType, pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->numOfOutputCols,
pQueryMsg->orderByIdx, pQueryMsg->numOfOutputCols,
pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen, pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen,
pQueryMsg->limit, pQueryMsg->offset); pQueryMsg->limit, pQueryMsg->offset);
@ -5731,7 +5707,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
SArray *pTableIdList) { SArray *pTableIdList) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) { if (pQInfo == NULL) {
goto _clean_memory; goto _clean_pQInfo_memory;
} }
SQuery *pQuery = calloc(1, sizeof(SQuery)); SQuery *pQuery = calloc(1, sizeof(SQuery));
@ -5841,7 +5817,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
vnodeParametersSafetyCheck(pQuery); vnodeParametersSafetyCheck(pQuery);
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo); dTrace("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
return pQInfo; return pQInfo;
_clean_memory: _clean_memory:
@ -5860,6 +5836,7 @@ _clean_memory:
tfree(pExprs); tfree(pExprs);
tfree(pGroupbyExpr); tfree(pGroupbyExpr);
_clean_pQInfo_memory:
tfree(pQInfo); tfree(pQInfo);
return NULL; return NULL;
@ -5927,7 +5904,7 @@ static void freeQInfo(SQInfo *pQInfo) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery; SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
setQueryKilled(pQInfo); setQueryKilled(pQInfo);
dTrace("QInfo:%p start to free SQInfo", pQInfo); dTrace("QInfo:%p start to free QInfo", pQInfo);
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
tfree(pQuery->sdata[col]); tfree(pQuery->sdata[col]);
} }
@ -5941,7 +5918,16 @@ static void freeQInfo(SQInfo *pQInfo) {
// } // }
sem_destroy(&(pQInfo->dataReady)); sem_destroy(&(pQInfo->dataReady));
vnodeQueryFreeQInfoEx(pQInfo); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
if (pQInfo->pTableDataInfo != NULL) {
// size_t num = taosHashGetSize(pQInfo->pTableIdList);
for (int32_t j = 0; j < 0; ++j) {
destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
}
}
tfree(pQInfo->pTableDataInfo);
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i]; SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i];
@ -5974,6 +5960,8 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQuery->pGroupbyExpr); tfree(pQuery->pGroupbyExpr);
tfree(pQuery); tfree(pQuery);
taosArrayDestroy(pQInfo->pTableIdList);
dTrace("QInfo:%p QInfo is freed", pQInfo); dTrace("QInfo:%p QInfo is freed", pQInfo);
// destroy signature, in order to avoid the query process pass the object safety check // destroy signature, in order to avoid the query process pass the object safety check
@ -6120,6 +6108,11 @@ _query_over:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qDestroyQueryInfo(SQInfo* pQInfo) {
dTrace("QInfo:%p query completed", pQInfo);
freeQInfo(pQInfo);
}
void qTableQuery(SQInfo *pQInfo) { void qTableQuery(SQInfo *pQInfo) {
if (pQInfo == NULL || pQInfo->signature != pQInfo) { if (pQInfo == NULL || pQInfo->signature != pQInfo) {
dTrace("%p freed abort query", pQInfo); dTrace("%p freed abort query", pQInfo);
@ -6133,6 +6126,9 @@ void qTableQuery(SQInfo *pQInfo) {
dTrace("QInfo:%p query task is launched", pQInfo); dTrace("QInfo:%p query task is launched", pQInfo);
// sem_post(&pQInfo->dataReady);
// pQInfo->runtimeEnv.pQuery->status = QUERY_OVER;
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
if (numOfTables == 1) { if (numOfTables == 1) {
singleTableQueryImpl(pQInfo); singleTableQueryImpl(pQInfo);
@ -6151,18 +6147,14 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
if (pQInfo->code == TSDB_CODE_SUCCESS) { return pQInfo->code;
return TSDB_CODE_QUERY_CANCELLED;
} else { // in case of not TSDB_CODE_SUCCESS, return the code to client
return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code);
}
} }
sem_wait(&pQInfo->dataReady); sem_wait(&pQInfo->dataReady);
dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size, dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size,
pQInfo->code); pQInfo->code);
return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code); return pQInfo->code;
} }
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) { bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) {
@ -6207,12 +6199,12 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) { if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) {
code = doDumpQueryResult(pQInfo, (*pRsp)->data); code = doDumpQueryResult(pQInfo, (*pRsp)->data);
} else { } else {
setQueryStatus(pQuery, QUERY_OVER);
code = pQInfo->code; code = pQInfo->code;
} }
if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
(*pRsp)->completed = 1; // notify no more result to client (*pRsp)->completed = 1; // notify no more result to client
freeQInfo(pQInfo);
} }
return code; return code;

View File

@ -40,7 +40,7 @@ void tLoserTreeDisplay(SLoserTreeInfo* pTree) {
printf("\n"); printf("\n");
} }
uint8_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* param, __merge_compare_fn_t compareFn) { uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* param, __merge_compare_fn_t compareFn) {
int32_t totalEntries = numOfEntries << 1; int32_t totalEntries = numOfEntries << 1;
*pTree = (SLoserTreeInfo*)calloc(1, sizeof(SLoserTreeInfo) + sizeof(SLoserTreeNode) * totalEntries); *pTree = (SLoserTreeInfo*)calloc(1, sizeof(SLoserTreeInfo) + sizeof(SLoserTreeNode) * totalEntries);

View File

@ -472,7 +472,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
} }
int32_t ref = T_REF_INC(ptNode); int32_t ref = T_REF_INC(ptNode);
pTrace("%p add data ref in cache, refcnt:%d", ptNode, ref) pTrace("%p acquired by data in cache, refcnt:%d", ptNode, ref)
// the data if referenced by at least one object, so the reference count must be greater than the value of 2. // the data if referenced by at least one object, so the reference count must be greater than the value of 2.
assert(ref >= 2); assert(ref >= 2);
@ -516,7 +516,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
*data = NULL; *data = NULL;
int16_t ref = T_REF_DEC(pNode); int16_t ref = T_REF_DEC(pNode);
pTrace("%p is released, refcnt:%d", pNode, ref); pTrace("%p data released, refcnt:%d", pNode, ref);
if (_remove) { if (_remove) {
__cache_wr_lock(pCacheObj); __cache_wr_lock(pCacheObj);

View File

@ -632,7 +632,7 @@ static void doInitGlobalConfig() {
tsInitConfigOption(cfg++, "clog", &tsCommitLog, TSDB_CFG_VTYPE_SHORT, tsInitConfigOption(cfg++, "clog", &tsCommitLog, TSDB_CFG_VTYPE_SHORT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
0, 1, 0, TSDB_CFG_UTYPE_NONE); 0, 2, 0, TSDB_CFG_UTYPE_NONE);
tsInitConfigOption(cfg++, "comp", &tsCompression, TSDB_CFG_VTYPE_SHORT, tsInitConfigOption(cfg++, "comp", &tsCompression, TSDB_CFG_VTYPE_SHORT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
0, 2, 0, TSDB_CFG_UTYPE_NONE); 0, 2, 0, TSDB_CFG_UTYPE_NONE);

View File

@ -126,7 +126,7 @@ int taosUpdateIdPool(id_pool_t *handle, int maxId) {
return -1; return -1;
} }
int *idList = calloc(maxId, sizeof(bool)); bool *idList = calloc(maxId, sizeof(bool));
if (idList == NULL) { if (idList == NULL) {
return -1; return -1;
} }
@ -137,7 +137,7 @@ int taosUpdateIdPool(id_pool_t *handle, int maxId) {
pIdPool->numOfFree += (maxId - pIdPool->maxId); pIdPool->numOfFree += (maxId - pIdPool->maxId);
pIdPool->maxId = maxId; pIdPool->maxId = maxId;
int *oldIdList = pIdPool->freeList; bool *oldIdList = pIdPool->freeList;
pIdPool->freeList = idList; pIdPool->freeList = idList;
free(oldIdList); free(oldIdList);

View File

@ -347,7 +347,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...)
va_start(argpointer, format); va_start(argpointer, format);
int writeLen = vsnprintf(buffer + len, MAX_LOGLINE_CONTENT_SIZE, format, argpointer); int writeLen = vsnprintf(buffer + len, MAX_LOGLINE_CONTENT_SIZE, format, argpointer);
if (writeLen <= 0) { if (writeLen <= 0) {
char tmp[MAX_LOGLINE_DUMP_BUFFER_SIZE]; char tmp[MAX_LOGLINE_DUMP_BUFFER_SIZE] = {0};
writeLen = vsnprintf(tmp, MAX_LOGLINE_DUMP_CONTENT_SIZE, format, argpointer); writeLen = vsnprintf(tmp, MAX_LOGLINE_DUMP_CONTENT_SIZE, format, argpointer);
strncpy(buffer + len, tmp, MAX_LOGLINE_CONTENT_SIZE); strncpy(buffer + len, tmp, MAX_LOGLINE_CONTENT_SIZE);
len += MAX_LOGLINE_CONTENT_SIZE; len += MAX_LOGLINE_CONTENT_SIZE;

View File

@ -77,7 +77,7 @@ void taosUnLockNote(int fd, taosNoteInfo * pNote)
void *taosThreadToOpenNewNote(void *param) void *taosThreadToOpenNewNote(void *param)
{ {
char name[NOTE_FILE_NAME_LEN]; char name[NOTE_FILE_NAME_LEN * 2];
taosNoteInfo * pNote = (taosNoteInfo *)param; taosNoteInfo * pNote = (taosNoteInfo *)param;
pNote->taosNoteFlag ^= 1; pNote->taosNoteFlag ^= 1;
@ -170,7 +170,7 @@ void taosGetNoteName(char *fn, taosNoteInfo * pNote)
int taosOpenNoteWithMaxLines(char *fn, int maxLines, int maxNoteNum, taosNoteInfo * pNote) int taosOpenNoteWithMaxLines(char *fn, int maxLines, int maxNoteNum, taosNoteInfo * pNote)
{ {
char name[NOTE_FILE_NAME_LEN] = "\0"; char name[NOTE_FILE_NAME_LEN * 2] = "\0";
struct stat notestat0, notestat1; struct stat notestat0, notestat1;
int size; int size;

View File

@ -18,7 +18,7 @@
#include "tskiplist.h" #include "tskiplist.h"
#include "tutil.h" #include "tutil.h"
static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level) { // record link count in each level __attribute__ ((unused)) static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level) { // record link count in each level
#if SKIP_LIST_RECORD_PERFORMANCE #if SKIP_LIST_RECORD_PERFORMANCE
for (int32_t i = 0; i < level; ++i) { for (int32_t i = 0; i < level; ++i) {
pSkipList->state.nLevelNodeCnt[i]++; pSkipList->state.nLevelNodeCnt[i]++;
@ -26,7 +26,7 @@ static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level
#endif #endif
} }
static FORCE_INLINE void removeNodeEachLevel(SSkipList *pSkipList, int32_t level) { __attribute__ ((unused)) static FORCE_INLINE void removeNodeEachLevel(SSkipList *pSkipList, int32_t level) {
#if SKIP_LIST_RECORD_PERFORMANCE #if SKIP_LIST_RECORD_PERFORMANCE
for (int32_t i = 0; i < level; ++i) { for (int32_t i = 0; i < level; ++i) {
pSkipList->state.nLevelNodeCnt[i]--; pSkipList->state.nLevelNodeCnt[i]--;

View File

@ -41,10 +41,13 @@ typedef struct {
void *sync; void *sync;
void *events; void *events;
void *cq; // continuous query void *cq; // continuous query
int32_t replicas;
SVnodeDesc vpeers[TSDB_MAX_MPEERS];
} SVnodeObj; } SVnodeObj;
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type); int vnodeWriteToQueue(void *param, SWalHead *pHead, int type);
void vnodeInitWriteFp(void); void vnodeInitWriteFp(void);
void vnodeInitReadFp(void);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -29,18 +29,21 @@
#include "vnode.h" #include "vnode.h"
#include "vnodeInt.h" #include "vnodeInt.h"
static void *tsDnodeVnodesHash; static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeCleanUp(SVnodeObj *pVnode);
static void vnodeBuildVloadMsg(char *pNode, void * param); static void vnodeBuildVloadMsg(char *pNode, void * param);
static int vnodeWALCallback(void *arg);
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int tsOpennedVnodes; static int tsOpennedVnodes;
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
static void vnodeInit() { static void vnodeInit() {
vnodeInitWriteFp(); vnodeInitWriteFp();
vnodeInitReadFp();
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj *), taosHashInt);
if (tsDnodeVnodesHash == NULL) { if (tsDnodeVnodesHash == NULL) {
dError("failed to init vnode list"); dError("failed to init vnode list");
} }
@ -50,7 +53,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t code; int32_t code;
pthread_once(&vnodeModuleInit, vnodeInit); pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId); SVnodeObj *pTemp = (SVnodeObj *)taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
if (pTemp != NULL) { if (pTemp != NULL) {
dPrint("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp); dPrint("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp);
@ -80,15 +83,21 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
} }
} }
char tsdbDir[TSDB_FILENAME_LEN] = {0}; code = vnodeSaveCfg(pVnodeCfg);
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); if (code != TSDB_CODE_SUCCESS) {
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL); dError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
if (code <0) {
dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
return code; return code;
} }
dPrint("vgId:%d, vnode is created", pVnodeCfg->cfg.vgId); char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
if (code != TSDB_CODE_SUCCESS) {
dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
return terrno;
}
dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog);
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir); code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
return code; return code;
@ -113,19 +122,18 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
char temp[TSDB_FILENAME_LEN]; char temp[TSDB_FILENAME_LEN];
pthread_once(&vnodeModuleInit, vnodeInit); pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj vnodeObj = {0}; SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
vnodeObj.vgId = vnode; pVnode->vgId = vnode;
vnodeObj.status = VN_STATUS_INIT; pVnode->status = VN_STATUS_INIT;
vnodeObj.refCount = 1; pVnode->refCount = 1;
vnodeObj.version = 0; pVnode->version = 0;
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj)); taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode));
sprintf(temp, "%s/tsdb", rootDir); int32_t code = vnodeReadCfg(pVnode);
void *pTsdb = tsdbOpenRepo(temp); if (code != TSDB_CODE_SUCCESS) {
if (pTsdb == NULL) { dError("pVnode:%p vgId:%d, failed to read cfg file", pVnode, pVnode->vgId);
dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
return terrno; return code;
} }
pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->wqueue = dnodeAllocateWqueue(pVnode);
@ -133,11 +141,24 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
sprintf(temp, "%s/wal", rootDir); sprintf(temp, "%s/wal", rootDir);
pVnode->wal = walOpen(temp, 3, tsCommitLog); pVnode->wal = walOpen(temp, 3, tsCommitLog);
pVnode->tsdb = pTsdb;
pVnode->sync = NULL; pVnode->sync = NULL;
pVnode->events = NULL; pVnode->events = NULL;
pVnode->cq = NULL; pVnode->cq = NULL;
STsdbAppH appH = {0};
appH.appH = (void *)pVnode;
appH.walCallBack = vnodeWALCallback;
sprintf(temp, "%s/tsdb", rootDir);
void *pTsdb = tsdbOpenRepo(temp, &appH);
if (pTsdb == NULL) {
dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
return terrno;
}
pVnode->tsdb = pTsdb;
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
pVnode->status = VN_STATUS_READY; pVnode->status = VN_STATUS_READY;
@ -149,7 +170,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
int32_t vnodeClose(int32_t vgId) { int32_t vnodeClose(int32_t vgId) {
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); SVnodeObj *pVnode = *(SVnodeObj **)taosGetIntHashData(tsDnodeVnodesHash, vgId);
if (pVnode == NULL) return 0; if (pVnode == NULL) return 0;
dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId); dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId);
@ -182,21 +203,30 @@ void vnodeRelease(void *pVnodeRaw) {
} }
dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId); dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId);
free(pVnode);
tsOpennedVnodes--; tsOpennedVnodes--;
if (tsOpennedVnodes <= 0) { if (tsOpennedVnodes <= 0) {
taosCleanUpIntHash(tsDnodeVnodesHash); taosCleanUpIntHash(tsDnodeVnodesHash);
vnodeModuleInit = PTHREAD_ONCE_INIT; vnodeModuleInit = PTHREAD_ONCE_INIT;
tsDnodeVnodesHash = NULL;
} }
} }
void *vnodeGetVnode(int32_t vgId) { void *vnodeGetVnode(int32_t vgId) {
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); SVnodeObj *pVnode = *(SVnodeObj **) taosGetIntHashData(tsDnodeVnodesHash, vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
terrno = TSDB_CODE_INVALID_VGROUP_ID; terrno = TSDB_CODE_INVALID_VGROUP_ID;
return NULL; return NULL;
} }
return pVnode;
}
void *vnodeAccquireVnode(int32_t vgId) {
SVnodeObj *pVnode = vnodeGetVnode(vgId);
if (pVnode == NULL) return pVnode;
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("pVnode:%p vgId:%d, get vnode, refCount:%d", pVnode, pVnode->vgId, pVnode->refCount); dTrace("pVnode:%p vgId:%d, get vnode, refCount:%d", pVnode, pVnode->vgId, pVnode->refCount);
@ -208,7 +238,7 @@ void *vnodeGetRqueue(void *pVnode) {
} }
void *vnodeGetWqueue(int32_t vgId) { void *vnodeGetWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeGetVnode(vgId); SVnodeObj *pVnode = vnodeAccquireVnode(vgId);
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
return pVnode->wqueue; return pVnode->wqueue;
} }
@ -227,7 +257,7 @@ void vnodeBuildStatusMsg(void *param) {
} }
static void vnodeBuildVloadMsg(char *pNode, void * param) { static void vnodeBuildVloadMsg(char *pNode, void * param) {
SVnodeObj *pVnode = (SVnodeObj *) pNode; SVnodeObj *pVnode = *(SVnodeObj **) pNode;
if (pVnode->status == VN_STATUS_DELETING) return; if (pVnode->status == VN_STATUS_DELETING) return;
SDMStatusMsg *pStatus = param; SDMStatusMsg *pStatus = param;
@ -240,10 +270,8 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) {
} }
static void vnodeCleanUp(SVnodeObj *pVnode) { static void vnodeCleanUp(SVnodeObj *pVnode) {
if (pVnode->status == VN_STATUS_DELETING) {
// fix deadlock occured while close system taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
}
//syncStop(pVnode->sync); //syncStop(pVnode->sync);
tsdbCloseRepo(pVnode->tsdb); tsdbCloseRepo(pVnode->tsdb);
@ -251,3 +279,64 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
// TODO: this is a simple implement
static int vnodeWALCallback(void *arg) {
SVnodeObj *pVnode = arg;
return walRenew(pVnode->wal);
}
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnodeCfg->cfg.vgId);
FILE *fp = fopen(cfgFile, "w");
if (!fp) return errno;
fprintf(fp, "replicas %d\n", pVnodeCfg->cfg.replications);
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
fprintf(fp, "index%d dnode %d ip %u\n", i, pVnodeCfg->vpeerDesc[i].dnodeId, pVnodeCfg->vpeerDesc[i].ip);
}
fclose(fp);
dTrace("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId);
return TSDB_CODE_SUCCESS;
}
// TODO: this is a simple implement
static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnode->vgId);
FILE *fp = fopen(cfgFile, "r");
if (!fp) return errno;
char option[3][32] = {0};
int32_t replicas = 0;
int32_t num = fscanf(fp, "%s %d", option[0], &replicas);
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[0], "replicas") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (replicas == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->replicas = replicas;
for (int32_t i = 0; i < replicas; ++i) {
int32_t dnodeId = 0;
uint32_t dnodeIp = 0;
num = fscanf(fp, "%s %s %d %s %u", option[0], option[1], &dnodeId, option[2], &dnodeIp);
if (num != 5) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[1], "dnode") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[2], "ip") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (dnodeId == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (dnodeIp == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->vpeers[i].dnodeId = dnodeId;
pVnode->vpeers[i].ip = dnodeIp;
pVnode->vpeers[i].vgId = pVnode->vgId;
}
fclose(fp);
dTrace("pVnode:%p vgId:%d, read vnode cfg successed", pVnode, pVnode->vgId);
return TSDB_CODE_SUCCESS;
}

View File

@ -0,0 +1,110 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tlog.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
#include "dataformat.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "queryExecutor.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
void vnodeInitReadFp(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = vnodeProcessRetrieveMsg;
}
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
SVnodeObj *pVnode = (SVnodeObj *)param;
if (vnodeProcessReadMsgFp[msgType] == NULL)
return TSDB_CODE_MSG_NOT_PROCESSED;
if (pVnode->status == VN_STATUS_DELETING || pVnode->status == VN_STATUS_CLOSING)
return TSDB_CODE_NOT_ACTIVE_VNODE;
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
}
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
memset(pRet, 0, sizeof(SRspRet));
int32_t code = TSDB_CODE_SUCCESS;
SQInfo* pQInfo = NULL;
if (contLen != 0) {
void* tsdb = vnodeGetTsdb(pVnode);
pRet->code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
pRsp->code = pRet->code;
pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp;
dTrace("pVnode:%p vgId:%d QInfo:%p, dnode query msg disposed", pVnode, pVnode->vgId, pQInfo);
} else {
pQInfo = pCont;
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
qTableQuery(pQInfo); // do execute query
return code;
}
static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) {
SRetrieveTableMsg *pRetrieve = pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
memset(pRet, 0, sizeof(SRspRet));
int32_t code = TSDB_CODE_SUCCESS;
dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is received", pVnode, pVnode->vgId, pQInfo);
pRet->code = qRetrieveQueryResultInfo(pQInfo);
if (pRet->code != TSDB_CODE_SUCCESS) {
//TODO
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
} else {
// todo check code and handle error in build result set
pRet->code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
if (qHasMoreResultsToRetrieve(pQInfo)) {
pRet->qhandle = pQInfo;
code = TSDB_CODE_ACTION_NEED_REPROCESSED;
} else {
// no further execution invoked, release the ref to vnode
qDestroyQueryInfo(pQInfo);
vnodeRelease(pVnode);
}
}
dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is disposed", pVnode, pVnode->vgId, pQInfo);
return code;
}

View File

@ -34,6 +34,15 @@ extern "C" {
#define TSDB_INVALID_SUPER_TABLE_ID -1 #define TSDB_INVALID_SUPER_TABLE_ID -1
// --------- TSDB APPLICATION HANDLE DEFINITION
typedef struct {
// WAL handle
void *appH;
int (*walCallBack)(void *);
int (*eventCallBack)(void *);
int (*cqueryCallBack)(void *);
} STsdbAppH;
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION // --------- TSDB REPOSITORY CONFIGURATION DEFINITION
typedef struct { typedef struct {
int8_t precision; int8_t precision;
@ -55,7 +64,7 @@ typedef void tsdb_repo_t; // use void to hide implementation details from outsi
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter); int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter);
int32_t tsdbDropRepo(tsdb_repo_t *repo); int32_t tsdbDropRepo(tsdb_repo_t *repo);
tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); tsdb_repo_t * tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH);
int32_t tsdbCloseRepo(tsdb_repo_t *repo); int32_t tsdbCloseRepo(tsdb_repo_t *repo);
int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg);
int32_t tsdbTriggerCommit(tsdb_repo_t *repo); int32_t tsdbTriggerCommit(tsdb_repo_t *repo);
@ -328,6 +337,12 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
*/ */
SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len); SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len);
/**
* clean up the query handle
* @param queryHandle
*/
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -322,6 +322,8 @@ typedef struct _tsdb_repo {
// TSDB configuration // TSDB configuration
STsdbCfg config; STsdbCfg config;
STsdbAppH appH;
// The meter meta handle of this TSDB repository // The meter meta handle of this TSDB repository
STsdbMeta *tsdbMeta; STsdbMeta *tsdbMeta;

View File

@ -177,7 +177,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo) {
* *
* @return a TSDB repository handle on success, NULL for failure and the error number is set * @return a TSDB repository handle on success, NULL for failure and the error number is set
*/ */
tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { tsdb_repo_t *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
char dataDir[128] = "\0"; char dataDir[128] = "\0";
if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) { if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) {
return NULL; return NULL;
@ -191,6 +191,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
pRepo->rootDir = strdup(tsdbDir); pRepo->rootDir = strdup(tsdbDir);
tsdbRestoreCfg(pRepo, &(pRepo->config)); tsdbRestoreCfg(pRepo, &(pRepo->config));
if (pAppH) pRepo->appH = *pAppH;
pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables); pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables);
if (pRepo->tsdbMeta == NULL) { if (pRepo->tsdbMeta == NULL) {
@ -366,14 +367,16 @@ int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) {
SSubmitMsgIter msgIter; SSubmitMsgIter msgIter;
tsdbInitSubmitMsgIter(pMsg, &msgIter); tsdbInitSubmitMsgIter(pMsg, &msgIter);
SSubmitBlk *pBlock; SSubmitBlk *pBlock = NULL;
int32_t code = TSDB_CODE_SUCCESS;
while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) { while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) {
if (tsdbInsertDataToTable(repo, pBlock) < 0) { if ((code = tsdbInsertDataToTable(repo, pBlock)) != TSDB_CODE_SUCCESS) {
return -1; return code;
} }
} }
return 0; return code;
} }
/** /**
@ -653,7 +656,7 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) {
} }
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo) { static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo) {
char fname[128]; char fname[260];
if (pRepo == NULL) return 0; if (pRepo == NULL) return 0;
char *dirName = calloc(1, strlen(pRepo->rootDir) + strlen("tsdb") + 2); char *dirName = calloc(1, strlen(pRepo->rootDir) + strlen("tsdb") + 2);
if (dirName == NULL) { if (dirName == NULL) {
@ -734,7 +737,9 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid}; STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId); STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
if (pTable == NULL) return -1; if (pTable == NULL) {
return TSDB_CODE_INVALID_TABLE_ID;
}
SSubmitBlkIter blkIter; SSubmitBlkIter blkIter;
SDataRow row; SDataRow row;
@ -746,7 +751,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
} }
} }
return 0; return TSDB_CODE_SUCCESS;
} }
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {

View File

@ -49,9 +49,8 @@ typedef struct SQueryFilePos {
} SQueryFilePos; } SQueryFilePos;
typedef struct SDataBlockLoadInfo { typedef struct SDataBlockLoadInfo {
int32_t fileListIndex; SFileGroup* fileGroup;
int32_t fileId; int32_t slot;
int32_t slotIdx;
int32_t sid; int32_t sid;
SArray *pLoadedCols; SArray *pLoadedCols;
} SDataBlockLoadInfo; } SDataBlockLoadInfo;
@ -190,10 +189,9 @@ static void initQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) {
} }
static void vnodeInitDataBlockLoadInfo(SDataBlockLoadInfo *pBlockLoadInfo) { static void vnodeInitDataBlockLoadInfo(SDataBlockLoadInfo *pBlockLoadInfo) {
pBlockLoadInfo->slotIdx = -1; pBlockLoadInfo->slot = -1;
pBlockLoadInfo->fileId = -1;
pBlockLoadInfo->sid = -1; pBlockLoadInfo->sid = -1;
pBlockLoadInfo->fileListIndex = -1; pBlockLoadInfo->fileGroup = NULL;
} }
static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) { static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
@ -202,76 +200,6 @@ static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
pCompBlockLoadInfo->fileListIndex = -1; pCompBlockLoadInfo->fileListIndex = -1;
} }
static int fileOrderComparFn(const void *p1, const void *p2) {
SHeaderFileInfo *pInfo1 = (SHeaderFileInfo *)p1;
SHeaderFileInfo *pInfo2 = (SHeaderFileInfo *)p2;
if (pInfo1->fileId == pInfo2->fileId) {
return 0;
}
return (pInfo1->fileId > pInfo2->fileId) ? 1 : -1;
}
void vnodeRecordAllFiles(int32_t vnodeId, SQueryFilesInfo *pVnodeFilesInfo) {
char suffix[] = ".head";
pVnodeFilesInfo->pFileInfo = taosArrayInit(4, sizeof(int32_t));
struct dirent *pEntry = NULL;
pVnodeFilesInfo->vnodeId = vnodeId;
char* tsDirectory = "";
sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId);
DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix);
if (pDir == NULL) {
// dError("QInfo:%p failed to open directory:%s, %s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix,
// strerror(errno));
return;
}
while ((pEntry = readdir(pDir)) != NULL) {
if ((pEntry->d_name[0] == '.' && pEntry->d_name[1] == '\0') || (strcmp(pEntry->d_name, "..") == 0)) {
continue;
}
if (pEntry->d_type & DT_DIR) {
continue;
}
size_t len = strlen(pEntry->d_name);
if (strcasecmp(&pEntry->d_name[len - 5], suffix) != 0) {
continue;
}
int32_t vid = 0;
int32_t fid = 0;
sscanf(pEntry->d_name, "v%df%d", &vid, &fid);
if (vid != vnodeId) { /* ignore error files */
// dError("QInfo:%p error data file:%s in vid:%d, ignore", pQInfo, pEntry->d_name, vnodeId);
continue;
}
// int32_t firstFid = pVnode->fileId - pVnode->numOfFiles + 1;
// if (fid > pVnode->fileId || fid < firstFid) {
// dError("QInfo:%p error data file:%s in vid:%d, fid:%d, fid range:%d-%d", pQInfo, pEntry->d_name, vnodeId,
// fid, firstFid, pVnode->fileId);
// continue;
// }
assert(fid >= 0 && vid >= 0);
taosArrayPush(pVnodeFilesInfo->pFileInfo, &fid);
}
closedir(pDir);
// dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles,
// pVnodeFilesInfo->dbFilePathPrefix);
// order the files information according their names */
size_t numOfFiles = taosArrayGetSize(pVnodeFilesInfo->pFileInfo);
qsort(pVnodeFilesInfo->pFileInfo->pData, numOfFiles, sizeof(SHeaderFileInfo), fileOrderComparFn);
}
tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) { tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) {
// todo 1. filter not exist table // todo 1. filter not exist table
@ -282,7 +210,6 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
pQueryHandle->window = pCond->twindow; pQueryHandle->window = pCond->twindow;
pQueryHandle->pTsdb = tsdb; pQueryHandle->pTsdb = tsdb;
pQueryHandle->pColumns = pColumnInfo;
pQueryHandle->loadDataAfterSeek = false; pQueryHandle->loadDataAfterSeek = false;
pQueryHandle->isFirstSlot = true; pQueryHandle->isFirstSlot = true;
@ -331,9 +258,6 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
vnodeInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); vnodeInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
vnodeInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); vnodeInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
int32_t vnodeId = 1;
vnodeRecordAllFiles(vnodeId, &pQueryHandle->vnodeFileInfo);
return (tsdb_query_handle_t)pQueryHandle; return (tsdb_query_handle_t)pQueryHandle;
} }
@ -468,6 +392,7 @@ static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) {
} }
} }
taosArrayDestroy(sa);
return pQueryHandle->realNumOfRows > 0; return pQueryHandle->realNumOfRows > 0;
} }
@ -513,10 +438,10 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) {
return true; return true;
} }
} else { // check data in cache } else { // check data in cache
pQueryHandle->cur.fid = -1;
return hasMoreDataInCacheForSingleModel(pQueryHandle); return hasMoreDataInCacheForSingleModel(pQueryHandle);
} }
} else { } else { // next block in the same file
// next block in the same file
cur->slot += step; cur->slot += step;
SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot]; SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
@ -601,9 +526,11 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo
if (QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { if (QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
endPos = blockInfo.size - 1; endPos = blockInfo.size - 1;
pQueryHandle->realNumOfRows = endPos - cur->pos + 1; pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
pCheckInfo->lastKey = blockInfo.window.ekey + 1;
} else if (!QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) { } else if (!QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
endPos = 0; endPos = 0;
pQueryHandle->realNumOfRows = cur->pos + 1; pQueryHandle->realNumOfRows = cur->pos + 1;
pCheckInfo->lastKey = blockInfo.window.ekey - 1;
} else { } else {
endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, pQueryHandle->order); endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, pQueryHandle->order);
@ -614,6 +541,8 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo
} else { } else {
pQueryHandle->realNumOfRows = endPos - cur->pos; pQueryHandle->realNumOfRows = endPos - cur->pos;
} }
pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1;
} else { } else {
if (endPos > cur->pos) { if (endPos > cur->pos) {
pQueryHandle->realNumOfRows = 0; pQueryHandle->realNumOfRows = 0;
@ -621,6 +550,8 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo
} else { } else {
pQueryHandle->realNumOfRows = cur->pos - endPos; pQueryHandle->realNumOfRows = cur->pos - endPos;
} }
assert(0);
} }
} }
@ -751,7 +682,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
int32_t tid = pCheckInfo->tableId.tid; int32_t tid = pCheckInfo->tableId.tid;
while (pCheckInfo->pFileGroup != NULL) { while (pCheckInfo->pFileGroup != NULL) {
if ((fid = getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup)) < 0) { if (getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup) != TSDB_CODE_SUCCESS) {
break; break;
} }
@ -761,7 +692,6 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
pCheckInfo->pFileGroup->fileId, tid); pCheckInfo->pFileGroup->fileId, tid);
pCheckInfo->pFileGroup = tsdbGetFileGroupNext(&pCheckInfo->fileIter); pCheckInfo->pFileGroup = tsdbGetFileGroupNext(&pCheckInfo->fileIter);
continue; continue;
} }
@ -790,7 +720,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
assert(index >= 0 && index < pCheckInfo->compIndex[tid].numOfSuperBlocks); assert(index >= 0 && index < pCheckInfo->compIndex[tid].numOfSuperBlocks);
// load first data block into memory failed, caused by disk block error // load first data block into memory failed, caused by disk block error
bool blockLoaded = false; bool blockLoaded = false;
SArray *sa = getDefaultLoadColumns(pQueryHandle, true); SArray *sa = getDefaultLoadColumns(pQueryHandle, true);
// todo no need to loaded at all // todo no need to loaded at all
@ -810,8 +740,12 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
pFile->fd = open(pFile->fname, O_RDONLY); pFile->fd = open(pFile->fname, O_RDONLY);
} }
if (tsdbLoadDataBlock(pFile, &pCheckInfo->pCompInfo->blocks[cur->slot], 1, if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) {
pCheckInfo->pDataCols, data) == 0) { SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
pBlockLoadInfo->fileGroup = pCheckInfo->pFileGroup;
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid;
blockLoaded = true; blockLoaded = true;
} }
@ -820,17 +754,27 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
// failed to load data from disk, abort current query // failed to load data from disk, abort current query
if (blockLoaded == false) { if (blockLoaded == false) {
taosArrayDestroy(sa);
tfree(data);
return false; return false;
} }
// todo search qualified points in blk, according to primary key (timestamp) column // todo search qualified points in blk, according to primary key (timestamp) column
SDataCols* pDataCols = pCheckInfo->pDataCols; SDataCols* pDataCols = pCheckInfo->pDataCols;
TSKEY* d = (TSKEY*) pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData;
assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast);
cur->pos = binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, key, pQueryHandle->order); cur->pos = binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, key, pQueryHandle->order);
cur->fid = pCheckInfo->pFileGroup->fileId; cur->fid = pCheckInfo->pFileGroup->fileId;
assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0); assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0);
filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa); filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa);
taosArrayDestroy(sa);
tfree(data);
return pQueryHandle->realNumOfRows > 0; return pQueryHandle->realNumOfRows > 0;
} }
@ -838,8 +782,6 @@ static bool hasMoreDataInFileForSingleTableModel(STsdbQueryHandle* pHandle) {
assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1); assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1);
STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb); STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb);
// SQueryFilePos* cur = &pHandle->cur;
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
if (!pCheckInfo->checkFirstFileBlock) { if (!pCheckInfo->checkFirstFileBlock) {
@ -952,7 +894,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
rows = pHandle->realNumOfRows; rows = pHandle->realNumOfRows;
skey = *(TSKEY*) pColInfoEx->pData; skey = *(TSKEY*) pColInfoEx->pData;
ekey = *(TSKEY*) pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1); ekey = *(TSKEY*) ((char*)pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1));
} }
} else { } else {
if (pTable->mem != NULL) { if (pTable->mem != NULL) {
@ -990,6 +932,10 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
if (pHandle->cur.fid < 0) { if (pHandle->cur.fid < 0) {
return pHandle->pColumns; return pHandle->pColumns;
} else { } else {
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
@ -1002,9 +948,16 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
} else { } else {
SArray *sa = getDefaultLoadColumns(pHandle, true); SArray *sa = getDefaultLoadColumns(pHandle, true);
doLoadDataFromFileBlock(pHandle); // data block has been loaded, todo extract method
filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa); SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
return pHandle->pColumns; if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->sid == pCheckInfo->pTableObj->tableId.tid) {
return pHandle->pColumns;
} else {
doLoadDataFromFileBlock(pHandle);
filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa);
return pHandle->pColumns;
}
} }
} }
} }
@ -1351,3 +1304,36 @@ SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCo
return result; return result;
} }
} }
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle;
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for(int32_t i = 0; i < size; ++i) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
tSkipListDestroyIter(pTableCheckInfo->iter);
if (pTableCheckInfo->pDataCols != NULL) {
tfree(pTableCheckInfo->pDataCols->buf);
}
tfree(pTableCheckInfo->pDataCols);
tfree(pTableCheckInfo->pCompInfo);
tfree(pTableCheckInfo->compIndex);
}
taosArrayDestroy(pQueryHandle->pTableCheckInfo);
size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
for(int32_t i = 0; i < cols; ++i) {
SColumnInfoEx *pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
tfree(pColInfo->pData);
}
taosArrayDestroy(pQueryHandle->pColumns);
tfree(pQueryHandle->unzipBuffer);
tfree(pQueryHandle->secondaryUnzipBuffer);
tfree(pQueryHandle);
}

View File

@ -140,7 +140,7 @@ TEST(TsdbTest, createRepo) {
// TEST(TsdbTest, DISABLED_openRepo) { // TEST(TsdbTest, DISABLED_openRepo) {
TEST(TsdbTest, openRepo) { TEST(TsdbTest, openRepo) {
tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
ASSERT_NE(repo, nullptr); ASSERT_NE(repo, nullptr);
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;

View File

@ -125,7 +125,7 @@ int walRenew(twal_h handle) {
if (pWal->num > pWal->max) { if (pWal->num > pWal->max) {
// remove the oldest wal file // remove the oldest wal file
char name[TSDB_FILENAME_LEN]; char name[TSDB_FILENAME_LEN * 3];
sprintf(name, "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max); sprintf(name, "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max);
if (remove(name) <0) { if (remove(name) <0) {
wError("wal:%s, failed to remove(%s)", name, strerror(errno)); wError("wal:%s, failed to remove(%s)", name, strerror(errno));
@ -150,7 +150,7 @@ int walWrite(void *handle, SWalHead *pHead) {
if (pWal->level == TAOS_WAL_NOLOG) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0;
pHead->signature = walSignature; pHead->signature = walSignature;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWal)); taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
int contLen = pHead->len + sizeof(SWalHead); int contLen = pHead->len + sizeof(SWalHead);
if(write(pWal->fd, pHead, contLen) != contLen) { if(write(pWal->fd, pHead, contLen) != contLen) {
@ -177,7 +177,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, SWalHead *, in
uint32_t maxId = 0, minId = -1, index =0; uint32_t maxId = 0, minId = -1, index =0;
int plen = strlen(walPrefix); int plen = strlen(walPrefix);
char opath[TSDB_FILENAME_LEN]; char opath[TSDB_FILENAME_LEN+5];
sprintf(opath, "%s/old", pWal->path); sprintf(opath, "%s/old", pWal->path);
// is there old directory? // is there old directory?
@ -272,7 +272,7 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW
break; break;
} }
if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wWarn("wal:%s, cksum is messed up, skip the rest of file", name); wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
break; break;
} }
@ -294,8 +294,8 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW
int walHandleExistingFiles(char *path) { int walHandleExistingFiles(char *path) {
int code = 0; int code = 0;
char oname[TSDB_FILENAME_LEN]; char oname[TSDB_FILENAME_LEN * 3];
char nname[TSDB_FILENAME_LEN]; char nname[TSDB_FILENAME_LEN * 3];
char opath[TSDB_FILENAME_LEN]; char opath[TSDB_FILENAME_LEN];
sprintf(opath, "%s/old", path); sprintf(opath, "%s/old", path);
@ -336,7 +336,7 @@ int walHandleExistingFiles(char *path) {
static int walRemoveWalFiles(char *path) { static int walRemoveWalFiles(char *path) {
int plen = strlen(walPrefix); int plen = strlen(walPrefix);
char name[TSDB_FILENAME_LEN]; char name[TSDB_FILENAME_LEN * 3];
int code = 0; int code = 0;
if (access(path, F_OK) != 0) return 0; if (access(path, F_OK) != 0) return 0;

View File

@ -24,17 +24,45 @@
void taosMsleep(int mseconds); void taosMsleep(int mseconds);
static int32_t doQuery(TAOS* taos, const char* sql) {
int32_t code = taos_query(taos, sql);
if (code != 0) {
printf("failed to execute query, reason:%s\n", taos_errstr(taos));
return -1;
}
TAOS_RES* res = taos_use_result(taos);
TAOS_ROW row = NULL;
char buf[512] = {0};
int32_t numOfFields = taos_num_fields(res);
TAOS_FIELD* pFields = taos_fetch_fields(res);
while((row = taos_fetch_row(res)) != NULL) {
taos_print_row(buf, row, pFields, numOfFields);
printf("%s\n", buf);
memset(buf, 0, 512);
}
taos_free_result(res);
return 0;
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
TAOS * taos; TAOS * taos;
char qstr[1024]; char qstr[1024];
TAOS_RES *result; TAOS_RES *result;
// connect to server // connect to server
if (argc < 2) { if (argc < 2) {
printf("please input server-ip \n"); printf("please input server-ip \n");
return 0; return 0;
} }
taos_options(TSDB_OPTION_CONFIGDIR, "~/sec/cfg");
// init TAOS // init TAOS
taos_init(); taos_init();
@ -45,6 +73,22 @@ int main(int argc, char *argv[]) {
} }
printf("success to connect to server\n"); printf("success to connect to server\n");
doQuery(taos, "create database if not exists test");
doQuery(taos, "use test");
doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:3', 3);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:4', 4);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:5', 5);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:6', 6);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);");
doQuery(taos, "select * from tm0;");
taos_close(taos);
return 0;
taos_query(taos, "drop database demo"); taos_query(taos, "drop database demo");
if (taos_query(taos, "create database demo") != 0) { if (taos_query(taos, "create database demo") != 0) {
@ -53,8 +97,10 @@ int main(int argc, char *argv[]) {
} }
printf("success to create database\n"); printf("success to create database\n");
taos_query(taos, "use demo"); taos_query(taos, "use demo");
// create table // create table
if (taos_query(taos, "create table m1 (ts timestamp, speed int)") != 0) { if (taos_query(taos, "create table m1 (ts timestamp, speed int)") != 0) {
printf("failed to create table, reason:%s\n", taos_errstr(taos)); printf("failed to create table, reason:%s\n", taos_errstr(taos));
@ -62,9 +108,11 @@ int main(int argc, char *argv[]) {
} }
printf("success to create table\n"); printf("success to create table\n");
// sleep for one second to make sure table is created on data node // sleep for one second to make sure table is created on data node
// taosMsleep(1000); // taosMsleep(1000);
// insert 10 records // insert 10 records
int i = 0; int i = 0;
for (i = 0; i < 10; ++i) { for (i = 0; i < 10; ++i) {
@ -76,6 +124,7 @@ int main(int argc, char *argv[]) {
} }
printf("success to insert rows, total %d rows\n", i); printf("success to insert rows, total %d rows\n", i);
// query the records // query the records
sprintf(qstr, "SELECT * FROM m1"); sprintf(qstr, "SELECT * FROM m1");
if (taos_query(taos, qstr) != 0) { if (taos_query(taos, qstr) != 0) {
@ -83,19 +132,24 @@ int main(int argc, char *argv[]) {
exit(1); exit(1);
} }
result = taos_use_result(taos); result = taos_use_result(taos);
if (result == NULL) { if (result == NULL) {
printf("failed to get result, reason:%s\n", taos_errstr(taos)); printf("failed to get result, reason:%s\n", taos_errstr(taos));
exit(1); exit(1);
} }
// TAOS_ROW row;
TAOS_ROW row; TAOS_ROW row;
int rows = 0; int rows = 0;
int num_fields = taos_field_count(taos); int num_fields = taos_field_count(taos);
TAOS_FIELD *fields = taos_fetch_fields(result); TAOS_FIELD *fields = taos_fetch_fields(result);
char temp[256]; char temp[256];
printf("select * from table, result:\n"); printf("select * from table, result:\n");
// fetch the records row by row // fetch the records row by row
while ((row = taos_fetch_row(result))) { while ((row = taos_fetch_row(result))) {

View File

@ -1,17 +1,8 @@
################################# #################################
run general/user/basic1.sim
run general/show/dnodes.sim #run general/table/basic1.sim
run general/db/basic1.sim
run general/db/basic2.sim
run general/db/basic3.sim
run general/db/basic4.sim
run general/db/basic5.sim
run general/table/basic1.sim
run general/table/basic2.sim run general/table/basic2.sim
run general/table/basic3.sim #run general/table/basic3.sim
################################## ##################################

View File

@ -69,7 +69,3 @@ if $data21 != 3 then
return -1 return -1
endi endi
sql drop database d1
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -64,6 +64,3 @@ if $data21 != 3 then
return -1 return -1
endi endi
sql drop database d1
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -1,4 +1,6 @@
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2
system sh/exec_up.sh -n dnode1 -s start system sh/exec_up.sh -n dnode1 -s start
system sh/exec_up.sh -n dnode2 -s start system sh/exec_up.sh -n dnode2 -s start
sql connect sql connect

View File

@ -290,7 +290,7 @@ loop_end:
input_buffer->offset += (size_t)(after_end - number_c_string); input_buffer->offset += (size_t)(after_end - number_c_string);
strncpy(item->numberstring, number_c_string, 12); strncpy(item->numberstring, (const char *)number_c_string, 12);
return true; return true;
} }