diff --git a/.gitignore b/.gitignore index d0af51d31b..83df761654 100644 --- a/.gitignore +++ b/.gitignore @@ -33,5 +33,8 @@ Target/ *.failed *.sql sim/ +psim/ +pysim/ +*.out *DS_Store diff --git a/.travis.yml b/.travis.yml index da0c47ea56..9bc576dcf9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -50,7 +50,7 @@ matrix: ./test-all.sh $TRAVIS_EVENT_TYPE || travis_terminate $? cd ${TRAVIS_BUILD_DIR}/tests/pytest - ./valgrind-test.sh -g 2>&1 > mem-error-out.txt + ./valgrind-test.sh 2>&1 > mem-error-out.txt sleep 1 # Color setting diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 1654f76f1c..dbbec01329 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -43,7 +43,7 @@ extern "C" { typedef struct SParsedColElem { int16_t colIndex; - int16_t offset; + uint16_t offset; } SParsedColElem; typedef struct SParsedDataColInfo { diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index a7eec31388..e6a37a2745 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -49,7 +49,7 @@ typedef struct STableComInfo { uint8_t numOfTags; uint8_t precision; int16_t numOfColumns; - int16_t rowSize; + int32_t rowSize; } STableComInfo; typedef struct STableMeta { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index bc717ed88c..110377d112 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -235,7 +235,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; STscObj *pObj = pSql->pTscObj; - tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont); + tscTrace("%p msg:%s is received from server", pSql, taosMsg[rpcMsg->msgType]); if (pSql->freed || pObj->signature != pObj) { tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed, diff --git a/src/dnode/inc/dnodeMain.h b/src/dnode/inc/dnodeMain.h new file mode 100644 index 0000000000..df7698ffc3 --- /dev/null +++ b/src/dnode/inc/dnodeMain.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DNODE_MAIN_H +#define TDENGINE_DNODE_MAIN_H + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t dnodeInitSystem(); +void dnodeCleanUpSystem(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 7949a62966..2f693c61fb 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -16,8 +16,6 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taos.h" -#include "tglobal.h" -#include "trpc.h" #include "tutil.h" #include "tconfig.h" #include "tglobal.h" @@ -29,112 +27,14 @@ #include "dnodeVRead.h" #include "dnodeShell.h" #include "dnodeVWrite.h" -#include "tgrant.h" -static int32_t dnodeInitSystem(); static int32_t dnodeInitStorage(); -extern void grantParseParameter(); static void dnodeCleanupStorage(); -static void dnodeCleanUpSystem(); static void dnodeSetRunStatus(SDnodeRunStatus status); -static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context); static void dnodeCheckDataDirOpenned(char *dir); static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED; -int32_t main(int32_t argc, char *argv[]) { - // Set global configuration file - for (int32_t i = 1; i < argc; ++i) { - if (strcmp(argv[i], "-c") == 0) { - if (i < argc - 1) { - strcpy(configDir, argv[++i]); - } else { - printf("'-c' requires a parameter, default:%s\n", configDir); - exit(EXIT_FAILURE); - } - } else if (strcmp(argv[i], "-V") == 0) { -#ifdef _SYNC - char *versionStr = "enterprise"; -#else - char *versionStr = "community"; -#endif - printf("%s version: %s compatible_version: %s\n", versionStr, version, compatible_version); - printf("gitinfo: %s\n", gitinfo); - printf("gitinfoI: %s\n", gitinfoOfInternal); - printf("buildinfo: %s\n", buildinfo); - exit(EXIT_SUCCESS); - } else if (strcmp(argv[i], "-k") == 0) { - grantParseParameter(); - exit(EXIT_SUCCESS); - } -#ifdef TAOS_MEM_CHECK - else if (strcmp(argv[i], "--alloc-random-fail") == 0) { - if ((i < argc - 1) && (argv[i + 1][0] != '-')) { - taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, argv[++i], true); - } else { - taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, NULL, true); - } - } else if (strcmp(argv[i], "--detect-mem-leak") == 0) { - if ((i < argc - 1) && (argv[i + 1][0] != '-')) { - taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, argv[++i], true); - } else { - taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, NULL, true); - } - } -#endif - } - - /* Set termination handler. */ - struct sigaction act = {{0}}; - act.sa_flags = SA_SIGINFO; - act.sa_sigaction = signal_handler; - sigaction(SIGTERM, &act, NULL); - sigaction(SIGHUP, &act, NULL); - sigaction(SIGINT, &act, NULL); - sigaction(SIGUSR1, &act, NULL); - sigaction(SIGUSR2, &act, NULL); - - // Open /var/log/syslog file to record information. - openlog("TDengine:", LOG_PID | LOG_CONS | LOG_NDELAY, LOG_LOCAL1); - syslog(LOG_INFO, "Starting TDengine service..."); - - // Initialize the system - if (dnodeInitSystem() < 0) { - syslog(LOG_ERR, "Error initialize TDengine system"); - closelog(); - - dnodeCleanUpSystem(); - exit(EXIT_FAILURE); - } - - syslog(LOG_INFO, "Started TDengine service successfully."); - - while (1) { - sleep(1000); - } -} - -static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) { - if (signum == SIGUSR1) { - taosCfgDynamicOptions("debugFlag 135"); - return; - } - if (signum == SIGUSR2) { - taosCfgDynamicOptions("resetlog"); - return; - } - syslog(LOG_INFO, "Shut down signal is %d", signum); - syslog(LOG_INFO, "Shutting down TDengine service..."); - // clean the system. - dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid); - dnodeCleanUpSystem(); - // close the syslog - syslog(LOG_INFO, "Shut down TDengine service successfully"); - dPrint("TDengine is shut down!"); - closelog(); - exit(EXIT_SUCCESS); -} - -static int32_t dnodeInitSystem() { +int32_t dnodeInitSystem() { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE); tscEmbedded = 1; taosResolveCRC(); @@ -180,7 +80,7 @@ static int32_t dnodeInitSystem() { return 0; } -static void dnodeCleanUpSystem() { +void dnodeCleanUpSystem() { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); dnodeCleanupShell(); diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c new file mode 100644 index 0000000000..683328db29 --- /dev/null +++ b/src/dnode/src/dnodeSystem.c @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "tgrant.h" +#include "tutil.h" +#include "tglobal.h" +#include "dnodeInt.h" +#include "dnodeMain.h" + +static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context); + +int32_t main(int32_t argc, char *argv[]) { + // Set global configuration file + for (int32_t i = 1; i < argc; ++i) { + if (strcmp(argv[i], "-c") == 0) { + if (i < argc - 1) { + strcpy(configDir, argv[++i]); + } else { + printf("'-c' requires a parameter, default:%s\n", configDir); + exit(EXIT_FAILURE); + } + } else if (strcmp(argv[i], "-V") == 0) { +#ifdef _SYNC + char *versionStr = "enterprise"; +#else + char *versionStr = "community"; +#endif + printf("%s version: %s compatible_version: %s\n", versionStr, version, compatible_version); + printf("gitinfo: %s\n", gitinfo); + printf("gitinfoI: %s\n", gitinfoOfInternal); + printf("buildinfo: %s\n", buildinfo); + exit(EXIT_SUCCESS); + } else if (strcmp(argv[i], "-k") == 0) { + grantParseParameter(); + exit(EXIT_SUCCESS); + } +#ifdef TAOS_MEM_CHECK + else if (strcmp(argv[i], "--alloc-random-fail") == 0) { + if ((i < argc - 1) && (argv[i + 1][0] != '-')) { + taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, argv[++i], true); + } else { + taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, NULL, true); + } + } else if (strcmp(argv[i], "--detect-mem-leak") == 0) { + if ((i < argc - 1) && (argv[i + 1][0] != '-')) { + taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, argv[++i], true); + } else { + taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, NULL, true); + } + } +#endif + } + + /* Set termination handler. */ + struct sigaction act = {{0}}; + act.sa_flags = SA_SIGINFO; + act.sa_sigaction = signal_handler; + sigaction(SIGTERM, &act, NULL); + sigaction(SIGHUP, &act, NULL); + sigaction(SIGINT, &act, NULL); + sigaction(SIGUSR1, &act, NULL); + sigaction(SIGUSR2, &act, NULL); + + // Open /var/log/syslog file to record information. + openlog("TDengine:", LOG_PID | LOG_CONS | LOG_NDELAY, LOG_LOCAL1); + syslog(LOG_INFO, "Starting TDengine service..."); + + // Initialize the system + if (dnodeInitSystem() < 0) { + syslog(LOG_ERR, "Error initialize TDengine system"); + closelog(); + + dnodeCleanUpSystem(); + exit(EXIT_FAILURE); + } + + syslog(LOG_INFO, "Started TDengine service successfully."); + + while (1) { + sleep(1000); + } +} + +static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) { + if (signum == SIGUSR1) { + taosCfgDynamicOptions("debugFlag 135"); + return; + } + if (signum == SIGUSR2) { + taosCfgDynamicOptions("resetlog"); + return; + } + syslog(LOG_INFO, "Shut down signal is %d", signum); + syslog(LOG_INFO, "Shutting down TDengine service..."); + // clean the system. + dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid); + dnodeCleanUpSystem(); + // close the syslog + syslog(LOG_INFO, "Shut down TDengine service successfully"); + dPrint("TDengine is shut down!"); + closelog(); + exit(EXIT_SUCCESS); +} diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index aaf71838bf..aa8cd99785 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -92,8 +92,6 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { char *pCont = (char *) pMsg->pCont; void *pVnode; - dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle); - while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId); @@ -214,6 +212,7 @@ static void *dnodeProcessReadQueue(void *param) { continue; } + dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet); dnodeSendRpcReadRsp(pVnode, pReadMsg, code); taosFreeQitem(pReadMsg); diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 5de4c16c50..879082f223 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -200,6 +200,7 @@ static void *dnodeProcessWriteQueue(void *param) { pHead->msgType = pWrite->rpcMsg.msgType; pHead->version = 0; pHead->len = pWrite->contLen; + dTrace("%p, msg:%s will be processed", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); } else { pHead = (SWalHead *)item; } diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 5dad1bdb53..61e82720b1 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -193,20 +193,20 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_ACCT_LEN TSDB_UNI_LEN #define TSDB_PASSWORD_LEN TSDB_UNI_LEN -#define TSDB_MAX_COLUMNS 256 +#define TSDB_MAX_COLUMNS 1024 #define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns #define TSDB_NODE_NAME_LEN 64 #define TSDB_TABLE_NAME_LEN 192 #define TSDB_DB_NAME_LEN 32 #define TSDB_COL_NAME_LEN 64 -#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 16 +#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE #define TSDB_MAX_ALLOWED_SQL_LEN (8*1024*1024U) // sql length should be less than 6mb -#define TSDB_MAX_BYTES_PER_ROW TSDB_MAX_COLUMNS * 16 -#define TSDB_MAX_TAGS_LEN 512 -#define TSDB_MAX_TAGS 32 +#define TSDB_MAX_BYTES_PER_ROW TSDB_MAX_COLUMNS * 64 +#define TSDB_MAX_TAGS_LEN 65536 +#define TSDB_MAX_TAGS 128 #define TSDB_AUTH_LEN 16 #define TSDB_KEY_LEN 16 @@ -236,7 +236,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_PAYLOAD_SIZE (TSDB_DEFAULT_PKT_SIZE - 100) #define TSDB_DEFAULT_PAYLOAD_SIZE 1024 // default payload size #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth -#define TSDB_SQLCMD_SIZE 1024 +#define TSDB_CQ_SQL_SIZE 1024 #define TSDB_MAX_VNODES 256 #define TSDB_MIN_VNODES 50 #define TSDB_INVALID_VNODE_NUM 0 diff --git a/src/inc/trpc.h b/src/inc/trpc.h index eff210433f..16223b813a 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -48,6 +48,7 @@ typedef struct { int contLen; int32_t code; void *handle; + void *ahandle; //app handle set by client, for debug purpose } SRpcMsg; typedef struct { diff --git a/src/mnode/inc/mgmtDef.h b/src/mnode/inc/mgmtDef.h index f33531583a..7c1fd6c5ab 100644 --- a/src/mnode/inc/mgmtDef.h +++ b/src/mnode/inc/mgmtDef.h @@ -68,7 +68,7 @@ typedef struct SMnodeObj { // todo use dynamic length string typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; + char *tableId; int8_t type; } STableObj; @@ -77,6 +77,7 @@ typedef struct SSuperTableObj { uint64_t uid; int64_t createdTime; int32_t sversion; + int32_t tversion; int32_t numOfColumns; int32_t numOfTags; int8_t reserved[15]; diff --git a/src/mnode/inc/mgmtSdb.h b/src/mnode/inc/mgmtSdb.h index a97b1219a9..975206d52e 100644 --- a/src/mnode/inc/mgmtSdb.h +++ b/src/mnode/inc/mgmtSdb.h @@ -35,7 +35,8 @@ typedef enum { typedef enum { SDB_KEY_STRING, SDB_KEY_INT, - SDB_KEY_AUTO + SDB_KEY_AUTO, + SDB_KEY_VAR_STRING, } ESdbKey; typedef enum { diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index c0d389f305..087c84effd 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -104,6 +104,14 @@ bool sdbIsServing() { return tsSdbObj.status == SDB_STATUS_SERVING; } +static void *sdbGetObjKey(SSdbTable *pTable, void *key) { + if (pTable->keyType == SDB_KEY_VAR_STRING) { + return *(char **)key; + } + + return key; +} + static char *sdbGetActionStr(int32_t action) { switch (action) { case SDB_ACTION_INSERT: @@ -116,20 +124,25 @@ static char *sdbGetActionStr(int32_t action) { return "invalid"; } -static char *sdbGetkeyStr(SSdbTable *pTable, void *row) { +static char *sdbGetKeyStr(SSdbTable *pTable, void *key) { static char str[16]; switch (pTable->keyType) { case SDB_KEY_STRING: - return (char *)row; + case SDB_KEY_VAR_STRING: + return (char *)key; case SDB_KEY_INT: case SDB_KEY_AUTO: - sprintf(str, "%d", *(int32_t *)row); + sprintf(str, "%d", *(int32_t *)key); return str; default: return "invalid"; } } +static char *sdbGetKeyStrFromObj(SSdbTable *pTable, void *key) { + return sdbGetKeyStr(pTable, sdbGetObjKey(pTable, key)); +} + static void *sdbGetTableFromId(int32_t tableId) { return tsSdbObj.tableList[tableId]; } @@ -332,50 +345,48 @@ void sdbCleanUp() { pthread_mutex_destroy(&tsSdbObj.mutex); } -void sdbIncRef(void *handle, void *pRow) { - if (pRow) { - SSdbTable *pTable = handle; - int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); - atomic_add_fetch_32(pRefCount, 1); - if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) { - sdbTrace("table:%s, add ref to record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), - *pRefCount); - } +void sdbIncRef(void *handle, void *pObj) { + if (pObj == NULL) return; + + SSdbTable *pTable = handle; + int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos); + atomic_add_fetch_32(pRefCount, 1); + if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) { + sdbTrace("table:%s, add ref to record:%s:%d", pTable->tableName, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount); } } -void sdbDecRef(void *handle, void *pRow) { - if (pRow) { - SSdbTable *pTable = handle; - int32_t * pRefCount = (int32_t *)(pRow + pTable->refCountPos); - int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); - if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) { - sdbTrace("table:%s, def ref of record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), - *pRefCount); - } - int8_t *updateEnd = pRow + pTable->refCountPos - 1; - if (refCount <= 0 && *updateEnd) { - sdbTrace("table:%s, record:%s:%s:%d is destroyed", pTable->tableName, pTable->tableName, - sdbGetkeyStr(pTable, pRow), *pRefCount); - SSdbOper oper = {.pObj = pRow}; - (*pTable->destroyFp)(&oper); - } +void sdbDecRef(void *handle, void *pObj) { + if (pObj == NULL) return; + + SSdbTable *pTable = handle; + int32_t * pRefCount = (int32_t *)(pObj + pTable->refCountPos); + int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); + if (0 && (pTable->tableId == SDB_TABLE_MNODE || pTable->tableId == SDB_TABLE_DNODE)) { + sdbTrace("table:%s, def ref of record:%s:%d", pTable->tableName, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount); + } + + int8_t *updateEnd = pObj + pTable->refCountPos - 1; + if (refCount <= 0 && *updateEnd) { + sdbTrace("table:%s, record:%s:%d is destroyed", pTable->tableName, sdbGetKeyStrFromObj(pTable, pObj), *pRefCount); + SSdbOper oper = {.pObj = pObj}; + (*pTable->destroyFp)(&oper); } } -static SSdbRow *sdbGetRowMeta(void *handle, void *key) { - SSdbTable *pTable = (SSdbTable *)handle; - SSdbRow * pMeta; - - if (handle == NULL) return NULL; +static SSdbRow *sdbGetRowMeta(SSdbTable *pTable, void *key) { + if (pTable == NULL) return NULL; int32_t keySize = sizeof(int32_t); - if (pTable->keyType == SDB_KEY_STRING) { + if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { keySize = strlen((char *)key); } - pMeta = taosHashGet(pTable->iHandle, key, keySize); + + return taosHashGet(pTable->iHandle, key, keySize); +} - return pMeta; +static SSdbRow *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) { + return sdbGetRowMeta(pTable, sdbGetObjKey(pTable, key)); } void *sdbGetRow(void *handle, void *key) { @@ -387,7 +398,7 @@ void *sdbGetRow(void *handle, void *key) { pthread_mutex_lock(&pTable->mutex); int32_t keySize = sizeof(int32_t); - if (pTable->keyType == SDB_KEY_STRING) { + if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { keySize = strlen((char *)key); } pMeta = taosHashGet(pTable->iHandle, key, keySize); @@ -400,6 +411,10 @@ void *sdbGetRow(void *handle, void *key) { return pMeta->row; } +static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) { + return sdbGetRow(pTable, sdbGetObjKey(pTable, key)); +} + static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { SSdbRow rowMeta; rowMeta.rowSize = pOper->rowSize; @@ -407,11 +422,14 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { pthread_mutex_lock(&pTable->mutex); + void * key = sdbGetObjKey(pTable, pOper->pObj); int32_t keySize = sizeof(int32_t); - if (pTable->keyType == SDB_KEY_STRING) { - keySize = strlen((char *)pOper->pObj); + + if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { + keySize = strlen((char *)key); } - taosHashPut(pTable->iHandle, pOper->pObj, keySize, &rowMeta, sizeof(SSdbRow)); + + taosHashPut(pTable->iHandle, key, keySize, &rowMeta, sizeof(SSdbRow)); sdbIncRef(pTable, pOper->pObj); pTable->numOfRows++; @@ -425,7 +443,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { pthread_mutex_unlock(&pTable->mutex); sdbTrace("table:%s, insert record:%s to hash, numOfRows:%d version:%" PRIu64, pTable->tableName, - sdbGetkeyStr(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion()); + sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion()); (*pTable->insertFp)(pOper); return TSDB_CODE_SUCCESS; @@ -436,17 +454,20 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) { pthread_mutex_lock(&pTable->mutex); + void * key = sdbGetObjKey(pTable, pOper->pObj); int32_t keySize = sizeof(int32_t); - if (pTable->keyType == SDB_KEY_STRING) { - keySize = strlen((char *)pOper->pObj); + + if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { + keySize = strlen((char *)key); } - taosHashRemove(pTable->iHandle, pOper->pObj, keySize); + + taosHashRemove(pTable->iHandle, key, keySize); pTable->numOfRows--; pthread_mutex_unlock(&pTable->mutex); sdbTrace("table:%s, delete record:%s from hash, numOfRows:%d version:%" PRIu64, pTable->tableName, - sdbGetkeyStr(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion()); + sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion()); int8_t *updateEnd = pOper->pObj + pTable->refCountPos - 1; *updateEnd = 1; @@ -457,7 +478,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) { static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) { sdbTrace("table:%s, update record:%s in hash, numOfRows:%d version:%" PRIu64, pTable->tableName, - sdbGetkeyStr(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion()); + sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion()); (*pTable->updateFp)(pOper); return TSDB_CODE_SUCCESS; @@ -488,7 +509,7 @@ static int sdbWrite(void *param, void *data, int type) { } else if (pHead->version != tsSdbObj.version + 1) { pthread_mutex_unlock(&tsSdbObj.mutex); sdbError("table:%s, failed to restore %s record:%s from wal, version:%" PRId64 " too large, sdb version:%" PRId64, - pTable->tableName, sdbGetActionStr(action), sdbGetkeyStr(pTable, pHead->cont), pHead->version, + pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, tsSdbObj.version); return TSDB_CODE_OTHERS; } else { @@ -540,8 +561,8 @@ int32_t sdbInsertRow(SSdbOper *pOper) { SSdbTable *pTable = (SSdbTable *)pOper->table; if (pTable == NULL) return -1; - if (sdbGetRow(pTable, pOper->pObj)) { - sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj)); + if (sdbGetRowFromObj(pTable, pOper->pObj)) { + sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, sdbGetKeyStrFromObj(pTable, pOper->pObj)); sdbDecRef(pTable, pOper->pObj); return TSDB_CODE_ALREADY_THERE; } @@ -580,7 +601,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { SSdbTable *pTable = (SSdbTable *)pOper->table; if (pTable == NULL) return -1; - SSdbRow *pMeta = sdbGetRowMeta(pTable, pOper->pObj); + SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj); if (pMeta == NULL) { sdbTrace("table:%s, record is not there, delete failed", pTable->tableName); return -1; @@ -590,25 +611,27 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { assert(pMetaRow != NULL); if (pOper->type == SDB_OPER_GLOBAL) { - int32_t rowSize = 0; + void * key = sdbGetObjKey(pTable, pOper->pObj); + int32_t keySize = 0; switch (pTable->keyType) { case SDB_KEY_STRING: - rowSize = strlen((char *)pOper->pObj) + 1; + case SDB_KEY_VAR_STRING: + keySize = strlen((char *)key) + 1; break; case SDB_KEY_INT: case SDB_KEY_AUTO: - rowSize = sizeof(uint64_t); + keySize = sizeof(uint32_t); break; default: return -1; } - int32_t size = sizeof(SWalHead) + rowSize; + int32_t size = sizeof(SWalHead) + keySize; SWalHead *pHead = taosAllocateQitem(size); pHead->version = 0; - pHead->len = rowSize; + pHead->len = keySize; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; - memcpy(pHead->cont, pOper->pObj, rowSize); + memcpy(pHead->cont, key, keySize); int32_t code = sdbWrite(pOper, pHead, pHead->msgType); taosFreeQitem(pHead); @@ -622,7 +645,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { SSdbTable *pTable = (SSdbTable *)pOper->table; if (pTable == NULL) return -1; - SSdbRow *pMeta = sdbGetRowMeta(pTable, pOper->pObj); + SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj); if (pMeta == NULL) { sdbTrace("table:%s, record is not there, delete failed", pTable->tableName); return -1; diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 804572b9ff..d8bcf67242 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -124,6 +124,8 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { + mTrace("%p, msg:%s will be processed", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]); + if (rpcMsg->pCont == NULL) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN); return; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 3e3b16fa9d..637201fdfb 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -84,6 +84,7 @@ static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg); static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName); static void mgmtDestroyChildTable(SChildTableObj *pTable) { + tfree(pTable->info.tableId); tfree(pTable->schema); tfree(pTable->sql); tfree(pTable); @@ -180,6 +181,7 @@ static int32_t mgmtChildTableActionUpdate(SSdbOper *pOper) { SChildTableObj *pNew = pOper->pObj; SChildTableObj *pTable = mgmtGetChildTable(pNew->info.tableId); if (pTable != pNew) { + void *oldTableId = pTable->info.tableId; void *oldSql = pTable->sql; void *oldSchema = pTable->schema; memcpy(pTable, pNew, pOper->rowSize); @@ -188,6 +190,7 @@ static int32_t mgmtChildTableActionUpdate(SSdbOper *pOper) { free(pNew); free(oldSql); free(oldSchema); + free(oldTableId); } mgmtDecTableRef(pTable); @@ -195,51 +198,66 @@ static int32_t mgmtChildTableActionUpdate(SSdbOper *pOper) { } static int32_t mgmtChildTableActionEncode(SSdbOper *pOper) { - const int32_t maxRowSize = sizeof(SChildTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16); SChildTableObj *pTable = pOper->pObj; assert(pTable != NULL && pOper->rowData != NULL); - if (pTable->info.type == TSDB_CHILD_TABLE) { - memcpy(pOper->rowData, pTable, tsChildTableUpdateSize); - pOper->rowSize = tsChildTableUpdateSize; - } else { + int32_t len = strlen(pTable->info.tableId); + if (len > TSDB_TABLE_ID_LEN) return TSDB_CODE_INVALID_TABLE_ID; + + memcpy(pOper->rowData, pTable->info.tableId, len); + memset(pOper->rowData + len, 0, 1); + len++; + + memcpy(pOper->rowData + len, (char*)pTable + sizeof(char *), tsChildTableUpdateSize); + len += tsChildTableUpdateSize; + + if (pTable->info.type != TSDB_CHILD_TABLE) { int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); - if (maxRowSize < tsChildTableUpdateSize + schemaSize) { - return TSDB_CODE_INVALID_MSG_LEN; + memcpy(pOper->rowData + len, pTable->schema, schemaSize); + len += schemaSize; + + if (pTable->sqlLen != 0) { + memcpy(pOper->rowData + len, pTable->sql, pTable->sqlLen); + len += pTable->sqlLen; } - memcpy(pOper->rowData, pTable, tsChildTableUpdateSize); - memcpy(pOper->rowData + tsChildTableUpdateSize, pTable->schema, schemaSize); - memcpy(pOper->rowData + tsChildTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen); - pOper->rowSize = tsChildTableUpdateSize + schemaSize + pTable->sqlLen; } + pOper->rowSize = len; + return TSDB_CODE_SUCCESS; } static int32_t mgmtChildTableActionDecode(SSdbOper *pOper) { assert(pOper->rowData != NULL); SChildTableObj *pTable = calloc(1, sizeof(SChildTableObj)); - if (pTable == NULL) { - return TSDB_CODE_SERV_OUT_OF_MEMORY; - } + if (pTable == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; - memcpy(pTable, pOper->rowData, tsChildTableUpdateSize); + int32_t len = strlen(pOper->rowData); + if (len > TSDB_TABLE_ID_LEN) return TSDB_CODE_INVALID_TABLE_ID; + pTable->info.tableId = strdup(pOper->rowData); + len++; + + memcpy((char*)pTable + sizeof(char *), pOper->rowData + len, tsChildTableUpdateSize); + len += tsChildTableUpdateSize; if (pTable->info.type != TSDB_CHILD_TABLE) { int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); pTable->schema = (SSchema *)malloc(schemaSize); if (pTable->schema == NULL) { mgmtDestroyChildTable(pTable); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + return TSDB_CODE_INVALID_TABLE_TYPE; } - memcpy(pTable->schema, pOper->rowData + tsChildTableUpdateSize, schemaSize); + memcpy(pTable->schema, pOper->rowData + len, schemaSize); + len += schemaSize; - pTable->sql = (char *)malloc(pTable->sqlLen); - if (pTable->sql == NULL) { - mgmtDestroyChildTable(pTable); - return TSDB_CODE_SERV_OUT_OF_MEMORY; + if (pTable->sqlLen != 0) { + pTable->sql = malloc(pTable->sqlLen); + if (pTable->sql == NULL) { + mgmtDestroyChildTable(pTable); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + memcpy(pTable->sql, pOper->rowData + len, pTable->sqlLen); } - memcpy(pTable->sql, pOper->rowData + tsChildTableUpdateSize + schemaSize, pTable->sqlLen); } pOper->pObj = pTable; @@ -311,15 +329,15 @@ static int32_t mgmtChildTableActionRestored() { static int32_t mgmtInitChildTables() { SChildTableObj tObj; - tsChildTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; + tsChildTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; SSdbTableDesc tableDesc = { .tableId = SDB_TABLE_CTABLE, .tableName = "ctables", .hashSessions = tsMaxTables, - .maxRowSize = sizeof(SChildTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16), + .maxRowSize = sizeof(SChildTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_ID_LEN + TSDB_CQ_SQL_SIZE, .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, - .keyType = SDB_KEY_STRING, + .keyType = SDB_KEY_VAR_STRING, .insertFp = mgmtChildTableActionInsert, .deleteFp = mgmtChildTableActionDelete, .updateFp = mgmtChildTableActionUpdate, @@ -372,6 +390,7 @@ static void mgmtDestroySuperTable(SSuperTableObj *pStable) { taosHashCleanup(pStable->vgHash); pStable->vgHash = NULL; } + tfree(pStable->info.tableId); tfree(pStable->schema); tfree(pStable); } @@ -408,11 +427,13 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) { SSuperTableObj *pNew = pOper->pObj; SSuperTableObj *pTable = mgmtGetSuperTable(pNew->info.tableId); if (pTable != pNew) { + void *oldTableId = pTable->info.tableId; void *oldSchema = pTable->schema; memcpy(pTable, pNew, pOper->rowSize); pTable->schema = pNew->schema; free(pNew->vgHash); free(pNew); + free(oldTableId); free(oldSchema); } mgmtDecTableRef(pTable); @@ -420,40 +441,50 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) { } static int32_t mgmtSuperTableActionEncode(SSdbOper *pOper) { - const int32_t maxRowSize = sizeof(SChildTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16); - SSuperTableObj *pStable = pOper->pObj; assert(pOper->pObj != NULL && pOper->rowData != NULL); + int32_t len = strlen(pStable->info.tableId); + if (len > TSDB_TABLE_ID_LEN) len = TSDB_CODE_INVALID_TABLE_ID; + + memcpy(pOper->rowData, pStable->info.tableId, len); + memset(pOper->rowData + len, 0, 1); + len++; + + memcpy(pOper->rowData + len, (char*)pStable + sizeof(char *), tsSuperTableUpdateSize); + len += tsSuperTableUpdateSize; + int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); + memcpy(pOper->rowData + len, pStable->schema, schemaSize); + len += schemaSize; - if (maxRowSize < tsSuperTableUpdateSize + schemaSize) { - return TSDB_CODE_INVALID_MSG_LEN; - } - - memcpy(pOper->rowData, pStable, tsSuperTableUpdateSize); - memcpy(pOper->rowData + tsSuperTableUpdateSize, pStable->schema, schemaSize); - pOper->rowSize = tsSuperTableUpdateSize + schemaSize; + pOper->rowSize = len; return TSDB_CODE_SUCCESS; } static int32_t mgmtSuperTableActionDecode(SSdbOper *pOper) { assert(pOper->rowData != NULL); - SSuperTableObj *pStable = (SSuperTableObj *) calloc(1, sizeof(SSuperTableObj)); if (pStable == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; - memcpy(pStable, pOper->rowData, tsSuperTableUpdateSize); + int32_t len = strlen(pOper->rowData); + if (len > TSDB_TABLE_ID_LEN) return TSDB_CODE_INVALID_TABLE_ID; + pStable->info.tableId = strdup(pOper->rowData); + len++; + + memcpy((char*)pStable + sizeof(char *), pOper->rowData + len, tsSuperTableUpdateSize); + len += tsSuperTableUpdateSize; int32_t schemaSize = sizeof(SSchema) * (pStable->numOfColumns + pStable->numOfTags); pStable->schema = malloc(schemaSize); if (pStable->schema == NULL) { mgmtDestroySuperTable(pStable); - return -1; + return TSDB_CODE_NOT_SUPER_TABLE; } - memcpy(pStable->schema, pOper->rowData + tsSuperTableUpdateSize, schemaSize); + memcpy(pStable->schema, pOper->rowData + len, schemaSize); + pOper->pObj = pStable; return TSDB_CODE_SUCCESS; @@ -465,15 +496,15 @@ static int32_t mgmtSuperTableActionRestored() { static int32_t mgmtInitSuperTables() { SSuperTableObj tObj; - tsSuperTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; + tsSuperTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; SSdbTableDesc tableDesc = { .tableId = SDB_TABLE_STABLE, .tableName = "stables", .hashSessions = TSDB_MAX_SUPER_TABLES, - .maxRowSize = tsSuperTableUpdateSize + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16), + .maxRowSize = sizeof(SSuperTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_ID_LEN, .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, - .keyType = SDB_KEY_STRING, + .keyType = SDB_KEY_VAR_STRING, .insertFp = mgmtSuperTableActionInsert, .deleteFp = mgmtSuperTableActionDelete, .updateFp = mgmtSuperTableActionUpdate, @@ -720,18 +751,19 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) { SCMCreateTableMsg *pCreate = pMsg->pCont; - SSuperTableObj *pStable = (SSuperTableObj *)calloc(1, sizeof(SSuperTableObj)); + SSuperTableObj *pStable = calloc(1, sizeof(SSuperTableObj)); if (pStable == NULL) { mError("table:%s, failed to create, no enough memory", pCreate->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); return; } - strcpy(pStable->info.tableId, pCreate->tableId); + pStable->info.tableId = strdup(pCreate->tableId); pStable->info.type = TSDB_SUPER_TABLE; pStable->createdTime = taosGetTimestampMs(); pStable->uid = (((uint64_t) pStable->createdTime) << 16) + (sdbGetVersion() & ((1ul << 16) - 1ul)); pStable->sversion = 0; + pStable->tversion = 0; pStable->numOfColumns = htons(pCreate->numOfColumns); pStable->numOfTags = htons(pCreate->numOfTags); @@ -851,7 +883,7 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i } pStable->numOfTags += ntags; - pStable->sversion++; + pStable->tversion++; SSdbOper oper = { .type = SDB_OPER_GLOBAL, @@ -878,7 +910,7 @@ static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { memmove(pStable->schema + pStable->numOfColumns + col, pStable->schema + pStable->numOfColumns + col + 1, sizeof(SSchema) * (pStable->numOfTags - col - 1)); pStable->numOfTags--; - pStable->sversion++; + pStable->tversion++; SSdbOper oper = { .type = SDB_OPER_GLOBAL, @@ -1358,7 +1390,7 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb } static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) { - SChildTableObj *pTable = (SChildTableObj *) calloc(1, sizeof(SChildTableObj)); + SChildTableObj *pTable = calloc(1, sizeof(SChildTableObj)); if (pTable == NULL) { mError("table:%s, failed to alloc memory", pCreate->tableId); terrno = TSDB_CODE_SERV_OUT_OF_MEMORY; @@ -1371,10 +1403,10 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj pTable->info.type = TSDB_NORMAL_TABLE; } - strcpy(pTable->info.tableId, pCreate->tableId); - pTable->createdTime = taosGetTimestampMs(); - pTable->sid = tid; - pTable->vgId = pVgroup->vgId; + pTable->info.tableId = strdup(pCreate->tableId); + pTable->createdTime = taosGetTimestampMs(); + pTable->sid = tid; + pTable->vgId = pVgroup->vgId; if (pTable->info.type == TSDB_CHILD_TABLE) { char *pTagData = (char *) pCreate->schema; // it is a tag key @@ -1655,7 +1687,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { pMeta->sid = htonl(pTable->sid); pMeta->precision = pDb->cfg.precision; pMeta->tableType = pTable->info.type; - strncpy(pMeta->tableId, pTable->info.tableId, tListLen(pTable->info.tableId)); + strncpy(pMeta->tableId, pTable->info.tableId, strlen(pTable->info.tableId)); if (pTable->info.type == TSDB_CHILD_TABLE) { pMeta->sversion = htons(pTable->superTable->sversion); diff --git a/src/rpc/inc/rpcHead.h b/src/rpc/inc/rpcHead.h index 8b5410a596..520edadc7d 100644 --- a/src/rpc/inc/rpcHead.h +++ b/src/rpc/inc/rpcHead.h @@ -49,6 +49,7 @@ typedef struct { char encrypt:3; // encrypt algorithm, 0: no encryption uint16_t tranId; // transcation ID uint32_t linkUid; // for unique connection ID assigned by client + uint64_t ahandle; // ahandle assigned by client uint32_t sourceId; // source ID, an index for connection list uint32_t destId; // destination ID, an index for connection list uint32_t destIp; // destination IP address, for NAT scenario diff --git a/src/rpc/src/rpcCache.c b/src/rpc/src/rpcCache.c index edbb9b3e12..7a96571ab9 100644 --- a/src/rpc/src/rpcCache.c +++ b/src/rpc/src/rpcCache.c @@ -146,7 +146,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in rpcUnlockCache(pCache->lockedBy+hash); pCache->total++; - tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]); + // tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]); return; } @@ -200,9 +200,9 @@ void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connTy rpcUnlockCache(pCache->lockedBy+hash); if (pData) { - tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]); + //tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]); } else { - tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]); + //tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]); } return pData; @@ -240,8 +240,8 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash pNext = pNode->next; pCache->total--; pCache->count[hash]--; - tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode, - pCache->count[hash]); + //tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode, + // pCache->count[hash]); taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); pNode = pNext; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 7e80f0d282..118f57d89c 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -87,6 +87,7 @@ typedef struct { } SRpcReqContext; typedef struct SRpcConn { + char info[50];// debug info: label + pConn + ahandle int sid; // session ID uint32_t ownId; // own link ID uint32_t peerId; // peer link ID @@ -275,7 +276,7 @@ void *rpcOpen(const SRpcInit *pInit) { return NULL; } - tTrace("%s RPC is openned, numOfThreads:%d", pRpc->label, pRpc->numOfThreads); + tTrace("%s rpc is openned, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions); return pRpc; } @@ -299,7 +300,7 @@ void rpcClose(void *param) { tfree(pRpc->connList); pthread_mutex_destroy(&pRpc->mutex); - tTrace("%s RPC is closed", pRpc->label); + tTrace("%s rpc is closed", pRpc->label); tfree(pRpc); } @@ -361,9 +362,10 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) // connection type is application specific. // for TDengine, all the query, show commands shall have TCP connection char type = pMsg->msgType; - if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE || type == TSDB_MSG_TYPE_FETCH || - type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_CM_TABLES_META || - type == TSDB_MSG_TYPE_CM_SHOW ) + if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_CM_RETRIEVE + || type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP + || type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_TABLE_META + || type == TSDB_MSG_TYPE_CM_SHOW ) pContext->connType = RPC_CONN_TCPC; rpcSendReqToServer(pRpc, pContext); @@ -374,8 +376,6 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) void rpcSendResponse(const SRpcMsg *pRsp) { int msgLen = 0; SRpcConn *pConn = (SRpcConn *)pRsp->handle; - SRpcInfo *pRpc = pConn->pRpc; - SRpcMsg rpcMsg = *pRsp; SRpcMsg *pMsg = &rpcMsg; @@ -393,7 +393,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) { rpcLockConn(pConn); if ( pConn->inType == 0 || pConn->user[0] == 0 ) { - tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn); + tTrace("%s, connection is already released, rsp wont be sent", pConn->info); rpcUnlockConn(pConn); return; } @@ -409,7 +409,8 @@ void rpcSendResponse(const SRpcMsg *pRsp) { pHead->linkUid = pConn->linkUid; pHead->port = htons(pConn->localPort); pHead->code = htonl(pMsg->code); - + pHead->ahandle = (uint64_t) pConn->ahandle; + // set pConn parameters pConn->inType = 0; @@ -491,6 +492,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, uint32_t peerIp = taosGetIpFromFqdn(peerFqdn); if (peerIp == -1) { tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); + terrno = TSDB_CODE_APP_ERROR; return NULL; } @@ -506,11 +508,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, if (taosOpenConn[connType]) { void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle; pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort); - if (pConn->chandle) { - tTrace("%s %p, rpc connection is set up, sid:%d id:%s %s:%hu connType:%d", pRpc->label, - pConn, pConn->sid, pRpc->user, peerFqdn, pConn->peerPort, pConn->connType); - } else { - tError("%s %p, failed to set up connection to %s:%hu", pRpc->label, pConn, peerFqdn, pConn->peerPort); + if (pConn->chandle == NULL) { terrno = TSDB_CODE_NETWORK_UNAVAIL; rpcCloseConn(pConn); pConn = NULL; @@ -557,7 +555,7 @@ static void rpcCloseConn(void *thandle) { taosFreeId(pRpc->idPool, pConn->sid); pConn->pContext = NULL; - tTrace("%s %p, rpc connection is closed", pRpc->label, pConn); + tTrace("%s, rpc connection is closed", pConn->info); rpcUnlockConn(pConn); } @@ -619,7 +617,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } if (terrno != 0) { - tWarn("%s %p, user not there or server not ready", pRpc->label, pConn); taosFreeId(pRpc->idPool, sid); // sid shall be released pConn = NULL; } @@ -634,8 +631,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); - tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", - pRpc->label, pConn, sid, pConn->user, pConn->localPort); } return pConn; @@ -660,7 +655,6 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { if (pConn) { if (pConn->linkUid != pHead->linkUid) { - tTrace("%s %p, linkUid:0x%x not matched, received:0x%x", pRpc->label, pConn, pConn->linkUid, pHead->linkUid); terrno = TSDB_CODE_MISMATCHED_METER_ID; pConn = NULL; } @@ -677,21 +671,25 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); if ( pConn == NULL || pConn->user[0] == 0) { pConn = rpcOpenConn(pRpc, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); + } + + if (pConn) { + pConn->ahandle = pContext->ahandle; + sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle); } else { - tTrace("%s %p, connection is retrieved from cache", pRpc->label, pConn); + tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno)); } return pConn; } static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { - SRpcInfo *pRpc= pConn->pRpc; if (pConn->peerId == 0) { pConn->peerId = pHead->sourceId; } else { if (pConn->peerId != pHead->sourceId) { - tTrace("%s %p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn, + tTrace("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, pConn->peerId, pHead->sourceId); return TSDB_CODE_INVALID_VALUE; } @@ -700,17 +698,16 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { if (pConn->inTranId == pHead->tranId) { if (pConn->inType == pHead->msgType) { if (pHead->code == 0) { - tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); + tTrace("%s, %s is retransmitted", pConn->info, taosMsg[pHead->msgType]); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); } else { // do nothing, it is heart beat from client } } else if (pConn->inType == 0) { - tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn, - taosMsg[pHead->msgType], pConn->inTranId); + tTrace("%s, %s is already processed, tranId:%d", pConn->info, taosMsg[pHead->msgType], pConn->inTranId); rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response } else { - tTrace("%s %p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHead->msgType]); + tTrace("%s, mismatched message %s and tranId", pConn->info, taosMsg[pHead->msgType]); } // do not reply any message @@ -718,7 +715,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { } if (pConn->inType != 0) { - tTrace("%s %p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn, + tTrace("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, pConn->inTranId, pHead->tranId); return TSDB_CODE_LAST_SESSION_NOT_FINISHED; } @@ -750,7 +747,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) { if (pConn->tretry <= tsRpcMaxRetry) { - tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn); + tTrace("%s, peer is still processing the transaction", pConn->info); pConn->tretry++; rpcSendReqHead(pConn); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); @@ -789,7 +786,15 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { } pConn = rpcGetConnObj(pRpc, sid, pRecv); - if (pConn == NULL) return NULL; + if (pConn == NULL) { + tError("%s %p, failed to get connection obj(%s)", pRpc->label, pHead->ahandle, tstrerror(terrno)); + return NULL; + } else { + if (rpcIsReq(pHead->msgType)) { + pConn->ahandle = (void *)pHead->ahandle; + sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle); + } + } rpcLockConn(pConn); sid = pConn->sid; @@ -826,7 +831,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { static void rpcProcessBrokenLink(SRpcConn *pConn) { SRpcInfo *pRpc = pConn->pRpc; - tTrace("%s %p, link is broken", pRpc->label, pConn); + tTrace("%s, link is broken", pConn->info); // pConn->chandle = NULL; if (pConn->outType) { @@ -837,7 +842,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { if (pConn->inType) { // if there are pending request, notify the app - tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn); + tTrace("%s, connection is gone, notify the app", pConn->info); /* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; @@ -872,17 +877,17 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { pConn = rpcProcessMsgHead(pRpc, pRecv); if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace("%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", - pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, + tTrace("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", + pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); } int32_t code = terrno; if (code != TSDB_CODE_ALREADY_PROCESSED) { if (code != 0) { // parsing error - if ( rpcIsReq(pHead->msgType) ) { + if (rpcIsReq(pHead->msgType)) { rpcSendErrorMsgToPeer(pRecv, code); - tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); + tTrace("%s %p %p, %s is sent with error code:%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } } else { // parsing OK rpcProcessIncomingMsg(pConn, pHead); @@ -924,6 +929,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; rpcMsg.code = pHead->code; + rpcMsg.ahandle = pConn->ahandle; if ( rpcIsReq(pHead->msgType) ) { rpcMsg.handle = pConn; @@ -948,14 +954,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { pContext->redirect++; if (pContext->redirect > TSDB_MAX_REPLICA) { pHead->code = TSDB_CODE_NETWORK_UNAVAIL; - tWarn("%s %p, too many redirects, quit", pRpc->label, pConn); + tWarn("%s, too many redirects, quit", pConn->info); } } if (pHead->code == TSDB_CODE_REDIRECT) { pContext->numOfTry = 0; memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); - tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); + tTrace("%s, redirect is received, numOfIps:%d", pConn->info, pContext->ipSet.numOfIps); for (int i=0; iipSet.numOfIps; ++i) pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]); rpcSendReqToServer(pRpc, pContext); @@ -1061,6 +1067,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { return; } + pConn->ahandle = pContext->ahandle; rpcLockConn(pConn); // set the message header @@ -1074,6 +1081,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pHead->destId = pConn->peerId; pHead->port = 0; pHead->linkUid = pConn->linkUid; + pHead->ahandle = (uint64_t)pConn->ahandle; if (!pConn->secured) memcpy(pHead->user, pConn->user, tListLen(pHead->user)); // set the connection parameters @@ -1091,29 +1099,28 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { int writtenLen = 0; - SRpcInfo *pRpc = pConn->pRpc; SRpcHead *pHead = (SRpcHead *)msg; msgLen = rpcAddAuthPart(pConn, msg, msgLen); if ( rpcIsReq(pHead->msgType)) { if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace("%s %p, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", - pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn, - pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + tTrace("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", + pConn->info, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort, + msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace( "%s %p, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", - pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort, + tTrace("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", + pConn->info, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } + tTrace("connection type is: %d", pConn->connType); writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); if (writtenLen != msgLen) { - tError("%s %p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, - msgLen, writtenLen, strerror(errno)); + tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); } tDump(msg, msgLen); @@ -1128,7 +1135,7 @@ static void rpcProcessConnError(void *param, void *id) { return; } - tTrace("%s connection error happens", pRpc->label); + tTrace("%s %p, connection error happens", pRpc->label, pContext->ahandle); if (pContext->numOfTry >= pContext->ipSet.numOfIps) { rpcMsg.msgType = pContext->msgType+1; @@ -1154,23 +1161,21 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { rpcLockConn(pConn); if (pConn->outType && pConn->user[0]) { - tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); + tTrace("%s, expected %s is not received", pConn->info, taosMsg[(int)pConn->outType + 1]); pConn->pTimer = NULL; pConn->retry++; if (pConn->retry < 4) { - tTrace("%s %p, re-send msg:%s to %s:%hu", pRpc->label, pConn, - taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); + tTrace("%s, re-send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { // close the connection - tTrace("%s %p, failed to send msg:%s to %s:%hu", pRpc->label, pConn, - taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); + tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); reportDisc = 1; } } else { - tTrace("%s %p, retry timer not processed", pRpc->label, pConn); + tTrace("%s, retry timer not processed", pConn->info); } rpcUnlockConn(pConn); @@ -1187,10 +1192,10 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { SRpcInfo *pRpc = pConn->pRpc; if (pConn->user[0]) { - tTrace("%s %p, close the connection since no activity", pRpc->label, pConn); + tTrace("%s, close the connection since no activity", pConn->info); if (pConn->inType && pRpc->cfp) { // if there are pending request, notify the app - tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn); + tTrace("%s, notify the app, connection is gone", pConn->info); /* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; @@ -1203,7 +1208,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { } rpcCloseConn(pConn); } else { - tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId); + tTrace("%s, idle timer:%p not processed", pConn->info, tmrId); } } @@ -1214,11 +1219,11 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { rpcLockConn(pConn); if (pConn->inType && pConn->user[0]) { - tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn); + tTrace("%s, progress timer expired, send progress", pConn->info); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { - tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId); + tTrace("%s, progress timer:%p not processed", pConn->info, tmrId); } rpcUnlockConn(pConn); @@ -1252,7 +1257,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { memcpy(pCont + overhead, buf, compLen); pHead->comp = 1; - tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); + //tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); finalLen = compLen + overhead; } else { finalLen = contLen; @@ -1286,7 +1291,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { pNewHead->msgLen = rpcMsgLenFromCont(origLen); rpcFreeMsg(pHead); // free the compressed message buffer pHead = pNewHead; - tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); + //tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); } else { tError("failed to allocate memory to decompress msg, contLen:%d", contLen); } @@ -1343,7 +1348,6 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { SRpcHead *pHead = (SRpcHead *)msg; - SRpcInfo *pRpc = pConn->pRpc; int code = 0; if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){ @@ -1371,20 +1375,20 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { delta = (int32_t)htonl(pDigest->timeStamp); delta -= (int32_t)taosGetTimestampSec(); if (abs(delta) > 900) { - tWarn("%s %p, time diff:%d is too big, msg discarded", pRpc->label, pConn, delta); + tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); code = TSDB_CODE_INVALID_TIME_STAMP; } else { if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { - tError("%s %p, authentication failed, msg discarded", pRpc->label, pConn); + tError("%s, authentication failed, msg discarded", pConn->info); code = TSDB_CODE_AUTH_FAILURE; } else { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client - tTrace("%s %p, message is authenticated", pRpc->label, pConn); + //tTrace("%s, message is authenticated", pConn->info); } } } else { - tTrace("%s %p, auth spi:%d not matched with received:%d", pRpc->label, pConn, pConn->spi, pHead->spi); + tError("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi); code = TSDB_CODE_AUTH_FAILURE; } diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index c551f6b1db..677187e3b9 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -218,7 +218,7 @@ static void *taosRecvUdpData(void *param) { while (1) { dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); port = ntohs(sourceAdd.sin_port); - tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen); + //tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen); if (dataLen < sizeof(SRpcHead)) { tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno)); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 1b161a4ad7..f502c1babd 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -80,7 +80,7 @@ STable *tsdbDecodeTable(void *cont, int contLen) { T_READ_MEMBER(ptr, int8_t, pTable->type); int len = *(int *)ptr; ptr = (char *)ptr + sizeof(int); - pTable->name = calloc(1, len + VARSTR_HEADER_SIZE); + pTable->name = calloc(1, len + VARSTR_HEADER_SIZE + 1); if (pTable->name == NULL) return NULL; varDataSetLen(pTable->name, len); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 0f92be0967..6dabc98ae8 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -429,7 +429,7 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { SVnodeObj *pVnode = ahandle; - vTrace("vgId:%d, data file is synced", pVnode->vgId); + vTrace("vgId:%d, data file is synced, fversion:%" PRId64 "", pVnode->vgId, fversion); pVnode->fversion = fversion; pVnode->version = fversion; diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index ec772fd4d1..8634a523f3 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -12,6 +12,7 @@ python3 ./test.py $1 -f insert/binary.py python3 ./test.py $1 -f insert/nchar.py python3 ./test.py $1 -f insert/nchar-boundary.py python3 ./test.py $1 -f insert/nchar-unicode.py +python3 ./test.py $1 -f insert/multi.py python3 ./test.py $1 -f table/column_name.py python3 ./test.py $1 -f table/column_num.py diff --git a/tests/pytest/insert/bool.py b/tests/pytest/insert/bool.py index 062563f4ab..c175afd8b5 100644 --- a/tests/pytest/insert/bool.py +++ b/tests/pytest/insert/bool.py @@ -58,6 +58,14 @@ class TDTestCase: tdSql.query('select * from tb order by ts desc') tdLog.info('tdSql.checkRow(6)') tdSql.checkRows(6) + tdLog.info('=============== step7') + tdLog.info("insert into tb values (now+6m, true)") + tdSql.execute("insert into tb values (now+5m, true)") + tdLog.info('select * from tb order by ts desc') + tdSql.query('select * from tb order by ts desc') + tdLog.info('tdSql.checkRow(7)') + tdSql.checkRows(7) +# convert end # convert end def stop(self): diff --git a/tests/pytest/smoketest.sh b/tests/pytest/smoketest.sh index c0878354e0..6314b3f83b 100755 --- a/tests/pytest/smoketest.sh +++ b/tests/pytest/smoketest.sh @@ -21,6 +21,8 @@ python3 ./test.py $1 -f insert/date.py python3 ./test.py $1 -s && sleep 1 python3 ./test.py $1 -f insert/nchar.py python3 ./test.py $1 -s && sleep 1 +python3 ./test.py $1 -f insert/multi.py +python3 ./test.py $1 -s && sleep 1 python3 ./test.py $1 -f table/column_name.py python3 ./test.py $1 -s && sleep 1 diff --git a/tests/pytest/table/boundary.py b/tests/pytest/table/boundary.py new file mode 100644 index 0000000000..faa222231b --- /dev/null +++ b/tests/pytest/table/boundary.py @@ -0,0 +1,161 @@ +# -*- coding: utf-8 -*- + +import random +import string +import subprocess +import sys +from util.log import * +from util.cases import * +from util.sql import * + + +class TDTestCase: + def init( self, conn ): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + + def getLimitFromSourceCode( self, name ): + cmd = "grep -w '#define %s' ../../src/inc/taosdef.h|awk '{print $3}'" % name + return int(subprocess.check_output(cmd, shell=True)) + + + def generateString( self, length ): + chars = string.ascii_uppercase + string.ascii_lowercase + v = "" + for i in range( length ): + v += random.choice( chars ) + return v + + + def checkTagBoundaries( self ): + tdLog.debug( "checking tag boundaries" ) + tdSql.prepare() + + maxTags = self.getLimitFromSourceCode( 'TSDB_MAX_TAGS' ) + totalTagsLen = self.getLimitFromSourceCode( 'TSDB_MAX_TAGS_LEN' ) + tdLog.notice( "max tags is %d" % maxTags ) + tdLog.notice( "max total tag length is %d" % totalTagsLen ) + + # for binary tags, 2 bytes are used for length + tagLen = (totalTagsLen - maxTags * 2) // maxTags + firstTagLen = totalTagsLen - 2 * maxTags - tagLen * (maxTags - 1) + + sql = "create table cars(ts timestamp, f int) tags(t0 binary(%d)" % firstTagLen + for i in range( 1, maxTags ): + sql += ", t%d binary(%d)" % (i, tagLen) + sql += ");" + + tdLog.debug( "creating super table: " + sql ) + tdSql.execute( sql ) + tdSql.query( 'show stables' ) + tdSql.checkRows( 1 ) + + for i in range( 10 ): + sql = "create table car%d using cars tags('%d'" % (i, i) + sql += ", '0'" * (maxTags - 1) + ");" + tdLog.debug( "creating table: " + sql ) + tdSql.execute( sql ) + + sql = "insert into car%d values(now, 0);" % i + tdLog.debug( "inserting data: " + sql ) + tdSql.execute( sql ) + + tdSql.query( 'show tables' ) + tdLog.info( 'tdSql.checkRow(10)' ) + tdSql.checkRows( 10 ) + + tdSql.query( 'select * from cars;' ) + tdSql.checkRows( 10 ) + + + def checkColumnBoundaries( self ): + tdLog.debug( "checking column boundaries" ) + tdSql.prepare() + + # one column is for timestamp + maxCols = self.getLimitFromSourceCode( 'TSDB_MAX_COLUMNS' ) - 1 + + sql = "create table cars (ts timestamp" + for i in range( maxCols ): + sql += ", c%d int" % i + sql += ");" + tdSql.execute( sql ) + tdSql.query( 'show tables' ) + tdSql.checkRows( 1 ) + + sql = "insert into cars values (now" + for i in range( maxCols ): + sql += ", %d" % i + sql += ");" + tdSql.execute( sql ) + tdSql.query( 'select * from cars' ) + tdSql.checkRows( 1 ) + + + def checkTableNameBoundaries( self ): + tdLog.debug( "checking table name boundaries" ) + tdSql.prepare() + + maxTableNameLen = self.getLimitFromSourceCode( 'TSDB_TABLE_NAME_LEN' ) + tdLog.notice( "table name max length is %d" % maxTableNameLen ) + + name = self.generateString( maxTableNameLen - 1) + tdLog.info( "table name is '%s'" % name ) + + tdSql.execute( "create table %s (ts timestamp, value int)" % name ) + tdSql.execute( "insert into %s values(now, 0)" % name ) + + tdSql.query( 'show tables' ) + tdSql.checkRows( 1 ) + + tdSql.query( 'select * from %s' % name ) + tdSql.checkRows( 1 ) + + + def checkRowBoundaries( self ): + tdLog.debug( "checking row boundaries" ) + tdSql.prepare() + + # 8 bytes for timestamp + maxRowSize = 65536 - 8 + maxCols = self.getLimitFromSourceCode( 'TSDB_MAX_COLUMNS' ) - 1 + + # for binary cols, 2 bytes are used for length + colLen = (maxRowSize - maxCols * 2) // maxCols + firstColLen = maxRowSize - 2 * maxCols - colLen * (maxCols - 1) + + sql = "create table cars (ts timestamp, c0 binary(%d)" % firstColLen + for i in range( 1, maxCols ): + sql += ", c%d binary(%d)" % (i, colLen) + sql += ");" + tdSql.execute( sql ) + tdSql.query( 'show tables' ) + tdSql.checkRows( 1 ) + + col = self.generateString( firstColLen ) + sql = "insert into cars values (now, '%s'" % col + col = self.generateString( colLen ) + for i in range( 1, maxCols ): + sql += ", '%s'" % col + sql += ");" + tdLog.info( sql ); + tdSql.execute( sql ) + tdSql.query( "select * from cars" ) + tdSql.checkRows( 1 ) + + + def run(self): + self.checkTagBoundaries() + self.checkColumnBoundaries() + self.checkTableNameBoundaries() + self.checkRowBoundaries() + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/table/column_name.py b/tests/pytest/table/column_name.py index bb1f587a65..aa958fd60c 100644 --- a/tests/pytest/table/column_name.py +++ b/tests/pytest/table/column_name.py @@ -1,6 +1,9 @@ # -*- coding: utf-8 -*- import sys +import string +import random +import subprocess from util.log import * from util.cases import * from util.sql import * @@ -14,34 +17,9 @@ class TDTestCase: def run(self): tdSql.prepare() - # TSIM: system sh/stop_dnodes.sh - # TSIM: - # TSIM: system sh/ip.sh -i 1 -s up - # TSIM: system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1 - # TSIM: system sh/cfg.sh -n dnode1 -c walLevel -v 0 - # TSIM: system sh/exec.sh -n dnode1 -s start - # TSIM: - # TSIM: sleep 3000 - # TSIM: sql connect - # TSIM: - # TSIM: $i = 0 - # TSIM: $dbPrefix = lm_cm_db - # TSIM: $tbPrefix = lm_cm_tb - # TSIM: $db = $dbPrefix . $i - # TSIM: $tb = $tbPrefix . $i - # TSIM: - # TSIM: print =============== step1 tdLog.info('=============== step1') - # TSIM: sql create database $db - # TSIM: sql use $db - # TSIM: - # TSIM: sql drop table dd -x step0 tdLog.info('drop table dd -x step0') tdSql.error('drop table dd') - # TSIM: return -1 - # TSIM: step0: - # TSIM: - # TSIM: sql create table $tb(ts timestamp, int) -x step1 tdLog.info('create table tb(ts timestamp, int) -x step1') tdSql.error('create table tb(ts timestamp, int)') # TSIM: return -1 @@ -112,37 +90,24 @@ class TDTestCase: tdLog.info('=============== step4') # TSIM: sql create table $tb (ts timestamp, # a0123456789012345678901234567890123456789 int) + getMaxColNum = "grep -w '#define TSDB_COL_NAME_LEN' ../../src/inc/taosdef.h|awk '{print $3}'" + boundary = int(subprocess.check_output(getMaxColNum, shell=True)) + tdLog.info("get max column name length is %d" % boundary) + chars = string.ascii_uppercase + string.ascii_lowercase + +# col_name = ''.join(random.choices(chars, k=boundary+1)) +# tdLog.info( +# 'create table tb (ts timestamp, %s int), col_name length is %d' % (col_name, len(col_name))) +# tdSql.error( +# 'create table tb (ts timestamp, %s int)' % col_name) + + col_name = ''.join(random.choices(chars, k=boundary)) tdLog.info( - 'create table tb (ts timestamp, a0123456789012345678901234567890123456789 int)') + 'create table tb (ts timestamp, %s int), col_name length is %d' % + (col_name, len(col_name))) tdSql.execute( - 'create table tb (ts timestamp, a0123456789012345678901234567890123456789 int)') - # TSIM: sql drop table $tb - tdLog.info('drop table tb') - tdSql.execute('drop table tb') - # TSIM: - # TSIM: sql show tables - tdLog.info('show tables') - tdSql.query('show tables') - # TSIM: if $rows != 0 then - tdLog.info('tdSql.checkRow(0)') - tdSql.checkRows(0) - # TSIM: return -1 - # TSIM: endi - # TSIM: - # TSIM: print =============== step5 - tdLog.info('=============== step5') - # TSIM: sql create table $tb (ts timestamp, a0123456789 int) - tdLog.info('create table tb (ts timestamp, a0123456789 int)') - tdSql.execute('create table tb (ts timestamp, a0123456789 int)') - # TSIM: sql show tables - tdLog.info('show tables') - tdSql.query('show tables') - # TSIM: if $rows != 1 then - tdLog.info('tdSql.checkRow(1)') - tdSql.checkRows(1) - # TSIM: return -1 - # TSIM: endi - # TSIM: + 'create table tb (ts timestamp, %s int)' % col_name) + # TSIM: sql insert into $tb values (now , 1) tdLog.info("insert into tb values (now , 1)") tdSql.execute("insert into tb values (now , 1)") @@ -152,24 +117,6 @@ class TDTestCase: # TSIM: if $rows != 1 then tdLog.info('tdSql.checkRow(1)') tdSql.checkRows(1) - # TSIM: return -1 - # TSIM: endi - # TSIM: - # TSIM: sql drop database $db - tdLog.info('drop database db') - tdSql.execute('drop database db') - # TSIM: sql show databases - tdLog.info('show databases') - tdSql.query('show databases') - # TSIM: if $rows != 0 then - tdLog.info('tdSql.checkRow(0)') - tdSql.checkRows(0) - # TSIM: return -1 - # TSIM: endi - # TSIM: - # TSIM: - # TSIM: - # TSIM: # convert end def stop(self): diff --git a/tests/pytest/table/column_num.py b/tests/pytest/table/column_num.py index 486541bab7..877f0409dc 100644 --- a/tests/pytest/table/column_num.py +++ b/tests/pytest/table/column_num.py @@ -76,7 +76,7 @@ class TDTestCase: tdSql.checkRows(2) data = "now" - for x in range(0, boundary-1): + for x in range(0, boundary - 1): data = data + ", %d" % x tdLog.info("insert into tb1 values (%s)" % data) tdSql.execute("insert into tb1 values (%s)" % data) diff --git a/tests/pytest/table/tablename-boundary.py b/tests/pytest/table/tablename-boundary.py index fba1b2cf3c..335073065c 100644 --- a/tests/pytest/table/tablename-boundary.py +++ b/tests/pytest/table/tablename-boundary.py @@ -21,7 +21,7 @@ class TDTestCase: tableNameMaxLen = int( subprocess.check_output( getTableNameLen, shell=True)) - tdLog.notice("table name max length is %d" % tableNameMaxLen) + tdLog.info("table name max length is %d" % tableNameMaxLen) chars = string.ascii_uppercase + string.ascii_lowercase tb_name = ''.join(random.choices(chars, k=tableNameMaxLen)) tdLog.info('tb_name length %d' % len(tb_name)) diff --git a/tests/pytest/util/cases.py b/tests/pytest/util/cases.py index 9026cd0f2f..f65b0dfde3 100644 --- a/tests/pytest/util/cases.py +++ b/tests/pytest/util/cases.py @@ -71,7 +71,7 @@ class TDCases: case.run() except Exception as e: tdLog.notice(repr(e)) - tdLog.exit("%s failed: %s" % (__file__, fileName)) + tdLog.exit("%s failed" % (fileName)) case.stop() runNum += 1 continue diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 9f7466a7e7..727016adb3 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -223,8 +223,8 @@ class TDDnode: self.running = 1 tdLog.debug("dnode:%d is running with %s " % (self.index, cmd)) - tdLog.debug("wait 4 seconds for the dnode:%d to start." % (self.index)) - time.sleep(4) + tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index)) + time.sleep(5) def stop(self): if self.valgrind == 0: @@ -233,16 +233,14 @@ class TDDnode: toBeKilled = "valgrind.bin" if self.running != 0: - killCmd = "ps -ef|grep -w %s| grep '%s' | grep -v grep | awk '{print $2}' | xargs kill -INT" % ( - toBeKilled, self.cfgDir) - psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): + killCmd = "kill -INT %s" % processID os.system(killCmd) time.sleep(1) - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") self.running = 0 tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index)) @@ -254,15 +252,14 @@ class TDDnode: toBeKilled = "valgrind.bin" if self.running != 0: - killCmd = "ps -ef|grep -w %s| grep '%s' | grep -v grep | awk '{print $2}' | xargs kill -KILL" % ( - toBeKilled, self.cfgDir) psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): + killCmd = "kill -KILL %s" % processID os.system(killCmd) time.sleep(1) - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") self.running = 0 tdLog.debug("dnode:%d is stopped by kill -KILL" % (self.index)) @@ -307,21 +304,21 @@ class TDDnodes: self.dnodes.append(TDDnode(10)) def init(self, path): - killCmd = "ps -ef|grep -w taosd | grep -v grep | awk '{print $2}' | xargs kill -KILL" psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): + killCmd = "kill -KILL %s" % processID os.system(killCmd) time.sleep(1) - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - killCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}' | xargs kill -KILL" psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): + killCmd = "kill -KILL %s" % processID os.system(killCmd) time.sleep(1) - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") binPath = os.path.dirname(os.path.realpath(__file__)) binPath = binPath + "/../../../debug/" @@ -407,27 +404,27 @@ class TDDnodes: self.dnodes[i].stop() psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") if processID: cmd = "sudo systemctl stop taosd" os.system(cmd) # if os.system(cmd) != 0 : # tdLog.exit(cmd) - killCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}' | xargs kill -KILL" psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): + killCmd = "kill -KILL %s" % processID os.system(killCmd) time.sleep(1) - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - killCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}' | xargs kill -KILL" psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): + killCmd = "kill -KILL %s" % processID os.system(killCmd) time.sleep(1) - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") # if os.system(cmd) != 0 : # tdLog.exit(cmd) diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 0e7e186206..75f3bd9044 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -15,6 +15,7 @@ import sys import os import time import datetime +import inspect from util.log import * @@ -44,7 +45,12 @@ class TDSql: except BaseException: expectErrNotOccured = False if expectErrNotOccured: - tdLog.exit("failed: sql:%.40s, expect error not occured" % (sql)) + frame = inspect.stack()[1] + callerModule = inspect.getmodule(frame[0]) + callerFilename = callerModule.__file__ + tdLog.exit( + "%s failed: sql:%.40s, expect error not occured" % + (callerFilename, sql)) else: tdLog.info("sql:%.40s, expect error occured" % (sql)) @@ -62,33 +68,39 @@ class TDSql: def checkRows(self, expectRows): if self.queryRows != expectRows: + frame = inspect.stack()[1] + callerModule = inspect.getmodule(frame[0]) + callerFilename = callerModule.__file__ tdLog.exit( - "failed: sql:%.40s, queryRows:%d != expect:%d" % - (self.sql, self.queryRows, expectRows)) + "%s failed: sql:%.40s, queryRows:%d != expect:%d" % + (callerFilename, self.sql, self.queryRows, expectRows)) tdLog.info("sql:%.40s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectRows)) def checkData(self, row, col, data): + frame = inspect.stack()[1] + callerModule = inspect.getmodule(frame[0]) + callerFilename = callerModule.__file__ + if row < 0: tdLog.exit( - "failed: sql:%.40s, row:%d is smaller than zero" % - (self.sql, row)) + "%s failed: sql:%.40s, row:%d is smaller than zero" % + (callerFilename, self.sql, row)) if col < 0: tdLog.exit( - "failed: sql:%.40s, col:%d is smaller than zero" % - (self.sql, col)) + "%s failed: sql:%.40s, col:%d is smaller than zero" % + (callerFilename, self.sql, col)) if row >= self.queryRows: tdLog.exit( - "failed: sql:%.40s, row:%d is larger than queryRows:%d" % - (self.sql, row, self.queryRows)) + "%s failed: sql:%.40s, row:%d is larger than queryRows:%d" % + (callerFilename, self.sql, row, self.queryRows)) if col >= self.queryCols: tdLog.exit( - "failed: sql:%.40s, col:%d is larger than queryRows:%d" % - (self.sql, col, self.queryCols)) + "%s failed: sql:%.40s, col:%d is larger than queryRows:%d" % + (callerFilename, self.sql, col, self.queryCols)) if self.queryResult[row][col] != data: - tdLog.exit( - "failed: sql:%.40s row:%d col:%d data:%s != expect:%s" % - (self.sql, row, col, self.queryResult[row][col], data)) + tdLog.exit("%s failed: sql:%.40s row:%d col:%d data:%s != expect:%s" % ( + callerFilename, self.sql, row, col, self.queryResult[row][col], data)) if data is None: tdLog.info("sql:%.40s, row:%d col:%d data:%s == expect:%s" % @@ -104,22 +116,26 @@ class TDSql: (self.sql, row, col, self.queryResult[row][col], data)) def getData(self, row, col): + frame = inspect.stack()[1] + callerModule = inspect.getmodule(frame[0]) + callerFilename = callerModule.__file__ + if row < 0: tdLog.exit( - "failed: sql:%.40s, row:%d is smaller than zero" % - (self.sql, row)) + "%s failed: sql:%.40s, row:%d is smaller than zero" % + (callerFilename, self.sql, row)) if col < 0: tdLog.exit( - "failed: sql:%.40s, col:%d is smaller than zero" % - (self.sql, col)) + "%s failed: sql:%.40s, col:%d is smaller than zero" % + (callerFilename, self.sql, col)) if row >= self.queryRows: tdLog.exit( - "failed: sql:%.40s, row:%d is larger than queryRows:%d" % - (self.sql, row, self.queryRows)) + "%s failed: sql:%.40s, row:%d is larger than queryRows:%d" % + (callerFilename, self.sql, row, self.queryRows)) if col >= self.queryCols: tdLog.exit( - "failed: sql:%.40s, col:%d is larger than queryRows:%d" % - (self.sql, col, self.queryCols)) + "%s failed: sql:%.40s, col:%d is larger than queryRows:%d" % + (callerFilename, self.sql, col, self.queryCols)) return self.queryResult[row][col] def executeTimes(self, sql, times): @@ -137,8 +153,12 @@ class TDSql: def checkAffectedRows(self, expectAffectedRows): if self.affectedRows != expectAffectedRows: - tdLog.exit("failed: sql:%.40s, affectedRows:%d != expect:%d" % - (self.sql, self.affectedRows, expectAffectedRows)) + frame = inspect.stack()[1] + callerModule = inspect.getmodule(frame[0]) + callerFilename = callerModule.__file__ + + tdLog.exit("%s failed: sql:%.40s, affectedRows:%d != expect:%d" % ( + callerFilename, self.sql, self.affectedRows, expectAffectedRows)) tdLog.info("sql:%.40s, affectedRows:%d == expect:%d" % (self.sql, self.affectedRows, expectAffectedRows)) diff --git a/tests/pytest/valgrind-test.sh b/tests/pytest/valgrind-test.sh index 77c2b321df..0b5dfc0fa3 100755 --- a/tests/pytest/valgrind-test.sh +++ b/tests/pytest/valgrind-test.sh @@ -1,33 +1,35 @@ #!/bin/bash -python3 ./test.py $1 -f insert/basic.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/int.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/float.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/bigint.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/bool.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/double.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/smallint.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/tinyint.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/binary.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/date.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/nchar.py -python3 ./test.py $1 -s && sleep 1 +python3 ./test.py -g -f insert/basic.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/int.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/float.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/bigint.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/bool.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/double.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/smallint.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/tinyint.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/binary.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/date.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/nchar.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/multi.py +python3 ./test.py -g -s && sleep 1 -python3 ./test.py $1 -f table/column_name.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f table/column_num.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f table/db_table.py -python3 ./test.py $1 -s && sleep 1 +python3 ./test.py -g -f table/column_name.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f table/column_num.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f table/db_table.py +python3 ./test.py -g -s && sleep 1 -python3 ./test.py $1 -f import_merge/importDataLastSub.py -python3 ./test.py $1 -s && sleep 1 +python3 ./test.py -g -f import_merge/importDataLastSub.py +python3 ./test.py -g -s && sleep 1