diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 65d0b1915c..b258a2a252 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -6,13 +6,17 @@ "dockerfile": "Dockerfile", // Update 'VARIANT' to pick an Debian / Ubuntu OS version: debian-11, debian-10, debian-9, ubuntu-21.04, ubuntu-20.04, ubuntu-18.04 // Use Debian 11, Debian 9, Ubuntu 18.04 or Ubuntu 21.04 on local arm64/Apple Silicon - "args": { "VARIANT": "ubuntu-21.04" } + "args": { + "VARIANT": "ubuntu-21.04" + } }, - "runArgs": ["--cap-add=SYS_PTRACE", "--security-opt", "seccomp=unconfined"], - + "runArgs": [ + "--cap-add=SYS_PTRACE", + "--security-opt", + "seccomp=unconfined" + ], // Set *default* container specific settings.json values on container create. "settings": {}, - // Add the IDs of extensions you want installed when the container is created. "extensions": [ "ms-vscode.cpptools", @@ -21,15 +25,13 @@ "visualstudioexptteam.vscodeintel", "eamodio.gitlens", "matepek.vscode-catch2-test-adapter", - "spmeesseman.vscode-taskexplorer" + "spmeesseman.vscode-taskexplorer", + "cschlosser.doxdocgen" ], - // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [], - // Use 'postCreateCommand' to run commands after the container is created. // "postCreateCommand": "gcc -v", - // Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. "remoteUser": "root" -} +} \ No newline at end of file diff --git a/include/common/tmsg.h b/include/common/tmsg.h index affab8903c..1b2b6a677a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -156,10 +156,6 @@ typedef struct { uint16_t port; } SEpAddr; -typedef struct { - int32_t numOfVnodes; -} SMsgDesc; - typedef struct SMsgHead { int32_t contLen; int32_t vgId; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 1bd29ce396..3916898829 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -90,23 +90,23 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, cons /** * Force renew a table's local cached meta data. * @param pCatalog (input, got with catalogGetHandle) - * @param pRpc (input, rpc object) + * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param pTableName (input, table name, NOT including db name) * @return error code */ -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName); +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName); /** * Force renew a table's local cached meta data and get the new one. * @param pCatalog (input, got with catalogGetHandle) - * @param pRpc (input, rpc object) + * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param pTableName (input, table name, NOT including db name) * @param pTableMeta(output, table meta data, NEED to free it by calller) * @return error code */ -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); /** diff --git a/include/libs/parser/parsenodes.h b/include/libs/parser/parsenodes.h index b326ac032c..18596a9e18 100644 --- a/include/libs/parser/parsenodes.h +++ b/include/libs/parser/parsenodes.h @@ -154,14 +154,14 @@ typedef struct SVgDataBlocks { char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ... } SVgDataBlocks; -typedef struct SInsertStmtInfo { +typedef struct SVnodeModifOpStmtInfo { int16_t nodeType; SArray* pDataBlocks; // data block for each vgroup, SArray. int8_t schemaAttache; // denote if submit block is built with table schema or not uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert uint32_t insertType; // insert data from [file|sql statement| bound statement] const char* sql; // current sql statement position -} SInsertStmtInfo; +} SVnodeModifOpStmtInfo; typedef struct SDclStmtInfo { int16_t nodeType; diff --git a/include/os/os.h b/include/os/os.h index de2a8182db..972880da9c 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -22,11 +22,13 @@ extern "C" { #include #include +#include #include #include #include #include #include +#include #include #include #include @@ -36,19 +38,14 @@ extern "C" { #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include #include #include #include -#include #include +#include +#include + +#include #include "osAtomic.h" #include "osDef.h" diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index b410255ea4..36ff4194d8 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -54,7 +54,7 @@ bool taosGetSysMemory(float *memoryUsedMB); void taosPrintOsInfo(); int taosSystem(const char *cmd); void taosKillSystem(); -int32_t taosGetSystemUid(char *uid, int32_t uidlen); +int32_t taosGetSystemUUID(char *uid, int32_t uidlen); char * taosGetCmdlineByPID(int pid); void taosSetCoreDump(bool enable); diff --git a/include/util/tfile.h b/include/util/tfile.h index b3d141c443..d3813051a4 100644 --- a/include/util/tfile.h +++ b/include/util/tfile.h @@ -43,7 +43,7 @@ int32_t tfFsync(int64_t tfd); bool tfValid(int64_t tfd); int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence); int32_t tfFtruncate(int64_t tfd, int64_t length); - +void * tfMmapReadOnly(int64_t tfd, int64_t length); #ifdef __cplusplus } #endif diff --git a/include/util/thash.h b/include/util/thash.h index f38ab50893..a736fc26af 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -24,7 +24,7 @@ extern "C" { #include "tlockfree.h" typedef uint32_t (*_hash_fn_t)(const char *, uint32_t); -typedef int32_t (*_equal_fn_t)(const void*, const void*, uint32_t len); +typedef int32_t (*_equal_fn_t)(const void*, const void*, size_t len); typedef void (*_hash_before_fn_t)(void *); typedef void (*_hash_free_fn_t)(void *); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index bfb884c57e..47d0e517d5 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -53,8 +53,8 @@ static void registerRequest(SRequestObj* pRequest) { int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1); int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1); - tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self, - pRequest->pTscObj->id, num, currentInst, total); + tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%"PRIx64, pRequest->self, + pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); } } @@ -419,17 +419,20 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { *+------------+-----+-----------+---------------+ * @return */ -static int32_t requestSerialId = 0; uint64_t generateRequestId() { - uint64_t hashId = 0; + static uint64_t hashId = 0; + static int32_t requestSerialId = 0; - char uid[64] = {0}; - int32_t code = taosGetSystemUid(uid, tListLen(uid)); - if (code != TSDB_CODE_SUCCESS) { - tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead", tstrerror(TAOS_SYSTEM_ERROR(errno))); + if (hashId == 0) { + char uid[64] = {0}; + int32_t code = taosGetSystemUUID(uid, tListLen(uid)); + if (code != TSDB_CODE_SUCCESS) { + tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead", + tstrerror(TAOS_SYSTEM_ERROR(errno))); - } else { - hashId = MurmurHash3_32(uid, strlen(uid)); + } else { + hashId = MurmurHash3_32(uid, strlen(uid)); + } } int64_t ts = taosGetTimestampUs(); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0d590235ad..a967a65c44 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -140,7 +140,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj* (*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlLen = sqlLen; - tscDebugL("0x%"PRIx64" SQL: %s", (*pRequest)->requestId, (*pRequest)->sqlstr); + tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); return TSDB_CODE_SUCCESS; } @@ -203,7 +203,10 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { - return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows); + SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; + int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, pJob, &res); + pRequest->affectedRows = res.numOfRows; + return res.code; } return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); @@ -371,7 +374,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con taos_close(pTscObj); pTscObj = NULL; } else { - tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter); + tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pTransporter, pRequest->requestId); destroyRequest(pRequest); } @@ -441,14 +444,13 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { * There is not response callback function for submit response. * The actual inserted number of points is the first number. */ + int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start; if (pMsg->code == TSDB_CODE_SUCCESS) { - tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%" PRId64 " ms", pRequest->requestId, - TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, - pRequest->metric.rsp - pRequest->metric.start); + tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self, + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); } else { - tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%" PRId64 " ms", pRequest->requestId, - TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, - pRequest->metric.rsp - pRequest->metric.start); + tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self, + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); } taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index e5f3eba5c2..2b875b3eb5 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -262,6 +262,8 @@ const char *taos_data_type(int type) { const char *taos_get_client_info() { return version; } -int taos_affected_rows(TAOS_RES *res) { return 1; } +int taos_affected_rows(TAOS_RES *res) { + return ((SRequestObj*)res)->affectedRows; +} int taos_result_precision(TAOS_RES *res) { return TSDB_TIME_PRECISION_MILLI; } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index bb40d9ada2..26f1141cc0 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -57,95 +57,95 @@ TEST(testCase, connect_Test) { taos_close(pConn); } -//TEST(testCase, create_user_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, create_account_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, drop_account_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} +TEST(testCase, create_user_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); -//TEST(testCase, show_user_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "show users"); -// TAOS_ROW pRow = NULL; -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_close(pConn); -//} + TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } -//TEST(testCase, drop_user_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "drop user abc"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} + taos_free_result(pRes); + taos_close(pConn); +} -//TEST(testCase, show_db_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -//// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "show databases"); -// TAOS_ROW pRow = NULL; -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_close(pConn); -//} +TEST(testCase, create_account_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, drop_account_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, show_user_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "show users"); + TAOS_ROW pRow = NULL; + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_close(pConn); +} + +TEST(testCase, drop_user_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "drop user abc"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, show_db_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "show databases"); + TAOS_ROW pRow = NULL; + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_close(pConn); +} TEST(testCase, create_db_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -496,26 +496,31 @@ TEST(testCase, create_multiple_tables) { } taos_free_result(pRes); + +// for(int32_t i = 0; i < 10000; ++i) { +// char sql[512] = {0}; +// snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i); +// TAOS_RES* pres = taos_query(pConn, sql); +// if (taos_errno(pres) != 0) { +// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); +// } +// taos_free_result(pres); +// } + taos_close(pConn); } TEST(testCase, generated_request_id_test) { - uint64_t id0 = generateRequestId(); + SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - uint64_t id1 = generateRequestId(); - uint64_t id2 = generateRequestId(); - uint64_t id3 = generateRequestId(); - uint64_t id4 = generateRequestId(); + for(int32_t i = 0; i < 1000000; ++i) { + uint64_t v = generateRequestId(); + void* result = taosHashGet(phash, &v, sizeof(v)); + ASSERT_EQ(result, nullptr); + taosHashPut(phash, &v, sizeof(v), NULL, 0); + } - ASSERT_NE(id0, id1); - ASSERT_NE(id1, id2); - ASSERT_NE(id2, id3); - ASSERT_NE(id4, id3); - ASSERT_NE(id0, id2); - ASSERT_NE(id0, id4); - ASSERT_NE(id0, id3); - -// SHashObj *phash = taosHashInit() + taosHashClear(phash); } //TEST(testCase, projection_query_tables) { diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 00cfa6b413..b29f8276fe 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -145,7 +145,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { clusterObj.createdTime = taosGetTimestampMs(); clusterObj.updateTime = clusterObj.createdTime; - int32_t code = taosGetSystemUid(clusterObj.name, TSDB_CLUSTER_ID_LEN); + int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN); if (code != 0) { strcpy(clusterObj.name, "tdengine2.0"); mError("failed to get name from system, set to default val %s", clusterObj.name); diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index cbc4d75e8b..5f850f0112 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -86,7 +86,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (pTbCfg->type == META_SUPER_TABLE) { nTagCols = pTbCfg->stbCfg.nTagCols; pTagSchema = pTbCfg->stbCfg.pTagSchema; - } else if (pTbCfg->type == META_SUPER_TABLE) { + } else if (pTbCfg->type == META_CHILD_TABLE) { nTagCols = pStbCfg->stbCfg.nTagCols; pTagSchema = pStbCfg->stbCfg.pTagSchema; } else { diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index b8ad7ab235..4220a604b4 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -255,7 +255,7 @@ static int metaOpenBDBEnv(DB_ENV **ppEnv, const char *path) { return -1; } - ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_MPOOL, 0); + ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0); if (ret != 0) { BDB_PERR("Failed to open META env", ret); return -1; @@ -382,8 +382,8 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey // Second key is the first tag void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId); - pDbt[1].data = varDataVal(pTagVal); - pDbt[1].size = varDataLen(pTagVal); + pDbt[1].data = pTagVal; + pDbt[1].size = sizeof(int32_t); // Set index key memset(pSKey, 0, sizeof(*pSKey)); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 236264873e..abcfafa786 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -675,33 +675,31 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta); } -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) { +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName) { + if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } SVgroupInfo vgroupInfo = {0}; int32_t code = 0; - CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo)); + CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo)); STableMetaOutput output = {0}; - CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo, &output)); + CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output)); //CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output)); CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output)); _return: - tfree(output.tbMeta); - CTG_RET(code); } -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { - return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, true, pTableMeta); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { + return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta); } int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) { diff --git a/source/libs/index/inc/index_fst_counting_writer.h b/source/libs/index/inc/index_fst_counting_writer.h index fcc0d5a0b3..d7363f2f4c 100644 --- a/source/libs/index/inc/index_fst_counting_writer.h +++ b/source/libs/index/inc/index_fst_counting_writer.h @@ -22,6 +22,8 @@ extern "C" { #include "tfile.h" +//#define USE_MMAP 1 + #define DefaultMem 1024 * 1024 static char tmpFile[] = "./index"; @@ -39,6 +41,9 @@ typedef struct WriterCtx { bool readOnly; char buf[256]; int size; +#ifdef USE_MMAP + char* ptr; +#endif } file; struct { int32_t capa; diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 0657c68458..b5a65a9fda 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -74,16 +74,15 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { // sIdx->cache = (void*)indexCacheCreate(sIdx); sIdx->tindex = indexTFileCreate(path); if (sIdx->tindex == NULL) { goto END; } + sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); sIdx->cVersion = 1; - sIdx->path = calloc(1, strlen(path) + 1); - memcpy(sIdx->path, path, strlen(path)); + sIdx->path = tstrdup(path); pthread_mutex_init(&sIdx->mtx, NULL); - *index = sIdx; - return 0; #endif + END: if (sIdx != NULL) { indexClose(sIdx); } @@ -310,18 +309,14 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result // Get col info IndexCache* cache = NULL; - pthread_mutex_lock(&sIdx->mtx); char buf[128] = {0}; ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)}; int32_t sz = indexSerialCacheKey(&key, buf); + pthread_mutex_lock(&sIdx->mtx); IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz); - if (pCache == NULL) { - pthread_mutex_unlock(&sIdx->mtx); - return -1; - } - cache = *pCache; + cache = (pCache == NULL) ? NULL : *pCache; pthread_mutex_unlock(&sIdx->mtx); *result = taosArrayInit(4, sizeof(uint64_t)); @@ -329,7 +324,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result STermValueType s = kTypeValue; if (0 == indexCacheSearch(cache, query, *result, &s)) { if (s == kTypeDeletion) { - indexInfo("col: %s already drop by other opera", term->colName); + indexInfo("col: %s already drop by", term->colName); // coloum already drop by other oper, no need to query tindex return 0; } else { @@ -402,11 +397,11 @@ static void indexDestroyTempResult(SArray* result) { } int indexFlushCacheTFile(SIndex* sIdx, void* cache) { if (sIdx == NULL) { return -1; } - indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); + indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid); IndexCache* pCache = (IndexCache*)cache; TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName); - if (pReader == NULL) { indexWarn("empty pReader found"); } + if (pReader == NULL) { indexWarn("empty tfile reader found"); } // handle flush Iterate* cacheIter = indexCacheIteratorCreate(pCache); Iterate* tfileIter = tfileIteratorCreate(pReader); @@ -504,21 +499,20 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { tfileWriterClose(tw); TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName); + if (reader == NULL) { goto END; } - char buf[128] = {0}; TFileHeader* header = &reader->header; ICacheKey key = { .suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; pthread_mutex_lock(&sIdx->mtx); - IndexTFile* ifile = (IndexTFile*)sIdx->tindex; tfileCachePut(ifile->cache, &key, reader); - pthread_mutex_unlock(&sIdx->mtx); return ret; END: tfileWriterClose(tw); + return -1; } int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index e95de9286e..517cb1640d 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -20,7 +20,7 @@ #define MAX_INDEX_KEY_LEN 256 // test only, change later -#define MEM_TERM_LIMIT 5 * 10000 +#define MEM_TERM_LIMIT 10 * 10000 // ref index_cache.h:22 //#define CACHE_KEY_LEN(p) \ // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + @@ -261,7 +261,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA return 0; } int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) { - if (cache == NULL) { return -1; } + if (cache == NULL) { return 0; } IndexCache* pCache = cache; MemTable *mem = NULL, *imm = NULL; diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 0f29da1c27..2b64d65e46 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -18,7 +18,7 @@ #include "tutil.h" static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) { - if (ctx->offset + len > ctx->limit) { return -1; } + // if (ctx->offset + len > ctx->limit) { return -1; } if (ctx->type == TFile) { assert(len == tfWrite(ctx->file.fd, buf, len)); @@ -31,7 +31,12 @@ static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) { static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) { int nRead = 0; if (ctx->type == TFile) { +#ifdef USE_MMAP + nRead = len < ctx->file.size ? len : ctx->file.size; + memcpy(buf, ctx->file.ptr, nRead); +#else nRead = tfRead(ctx->file.fd, buf, len); +#endif } else { memcpy(buf, ctx->mem.buf + ctx->offset, len); } @@ -43,7 +48,13 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off int nRead = 0; if (ctx->type == TFile) { // tfLseek(ctx->file.fd, offset, 0); +#ifdef USE_MMAP + int32_t last = ctx->file.size - offset; + nRead = last >= len ? len : last; + memcpy(buf, ctx->file.ptr + offset, nRead); +#else nRead = tfPread(ctx->file.fd, buf, len, offset); +#endif } else { // refactor later assert(0); @@ -83,6 +94,9 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int struct stat fstat; stat(path, &fstat); ctx->file.size = fstat.st_size; +#ifdef USE_MMAP + ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.fd, ctx->file.size); +#endif } memcpy(ctx->file.buf, path, strlen(path)); if (ctx->file.fd < 0) { @@ -112,7 +126,11 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { free(ctx->mem.buf); } else { tfClose(ctx->file.fd); - ctx->flush(ctx); + if (ctx->file.readOnly) { +#ifdef USE_MMAP + munmap(ctx->file.ptr, ctx->file.size); +#endif + } if (remove) { unlink(ctx->file.buf); } } free(ctx); diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index d4d13ddf19..753dbe87b1 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -67,29 +67,18 @@ TFileCache* tfileCacheCreate(const char* path) { for (size_t i = 0; i < taosArrayGetSize(files); i++) { char* file = taosArrayGetP(files, i); - // refactor later, use colname and version info - char colName[256] = {0}; - if (0 != tfileParseFileName(file, &suid, colName, (int*)&version)) { - indexInfo("try parse invalid file: %s, skip it", file); - continue; - } - - char fullName[256] = {0}; - sprintf(fullName, "%s/%s", path, file); - - WriterCtx* wc = writerCtxCreate(TFile, fullName, true, 1024 * 1024 * 64); + WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64); if (wc == NULL) { indexError("failed to open index:%s", file); goto End; } - char buf[128] = {0}; TFileReader* reader = tfileReaderCreate(wc); + if (reader == NULL) { goto End; } TFileHeader* header = &reader->header; - ICacheKey key = {.suid = header->suid, - .colName = header->colName, - .nColName = strlen(header->colName), - .colType = header->colType}; + + char buf[128] = {0}; + ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName)}; int32_t sz = indexSerialCacheKey(&key, buf); assert(sz < sizeof(buf)); @@ -256,7 +245,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { // sort by coltype and write to tindex if (order == false) { __compar_fn_t fn; - int8_t colType = tw->header.colType; + + int8_t colType = tw->header.colType; if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) { fn = tfileStrCompare; } else { @@ -274,7 +264,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { // ugly code, refactor later for (size_t i = 0; i < sz; i++) { TFileValue* v = taosArrayGetP((SArray*)data, i); - // taosArrayRemoveDuplicate(v->tablId, tfileUidCompare, NULL); + taosArraySort(v->tableId, tfileUidCompare); + taosArrayRemoveDuplicate(v->tableId, tfileUidCompare, NULL); int32_t tbsz = taosArrayGetSize(v->tableId); fstOffset += TF_TABLE_TATOAL_SIZE(tbsz); } @@ -351,10 +342,16 @@ void tfileWriterDestroy(TFileWriter* tw) { } IndexTFile* indexTFileCreate(const char* path) { - IndexTFile* tfile = calloc(1, sizeof(IndexTFile)); - if (tfile == NULL) { return NULL; } + TFileCache* cache = tfileCacheCreate(path); + if (cache == NULL) { return NULL; } - tfile->cache = tfileCacheCreate(path); + IndexTFile* tfile = calloc(1, sizeof(IndexTFile)); + if (tfile == NULL) { + tfileCacheDestroy(cache); + return NULL; + } + + tfile->cache = cache; return tfile; } void indexTFileDestroy(IndexTFile* tfile) { @@ -366,6 +363,7 @@ void indexTFileDestroy(IndexTFile* tfile) { int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { int ret = -1; if (tfile == NULL) { return ret; } + IndexTFile* pTfile = (IndexTFile*)tfile; SIndexTerm* term = query->term; @@ -545,12 +543,11 @@ static int tfileReaderLoadHeader(TFileReader* reader) { int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0); if (nread == -1) { - // indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf), errno, reader->ctx->file.fd, reader->ctx->file.buf); } else { - indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf), - errno, reader->ctx->file.fd, reader->ctx->file.buf); + indexInfo("actual Read: %d, to read: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf), + reader->ctx->file.fd, reader->ctx->file.buf); } // assert(nread == sizeof(buf)); memcpy(&reader->header, buf, sizeof(buf)); @@ -566,7 +563,8 @@ static int tfileReaderLoadFst(TFileReader* reader) { WriterCtx* ctx = reader->ctx; int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); - indexError("nread = %d, and fst offset=%d, filename: %s ", nread, reader->header.fstOffset, ctx->file.buf); + indexInfo("nread = %d, and fst offset=%d, filename: %s, size: %d ", nread, reader->header.fstOffset, ctx->file.buf, + ctx->file.size); // we assuse fst size less than FST_MAX_SIZE assert(nread > 0 && nread < FST_MAX_SIZE); @@ -613,15 +611,20 @@ void tfileReaderUnRef(TFileReader* reader) { static SArray* tfileGetFileList(const char* path) { SArray* files = taosArrayInit(4, sizeof(void*)); + char buf[128] = {0}; + uint64_t suid; + uint32_t version; + DIR* dir = opendir(path); if (NULL == dir) { return NULL; } - struct dirent* entry; while ((entry = readdir(dir)) != NULL) { - if (entry->d_type && DT_DIR) { continue; } - size_t len = strlen(entry->d_name); - char* buf = calloc(1, len + 1); - memcpy(buf, entry->d_name, len); + char* file = entry->d_name; + if (0 != tfileParseFileName(file, &suid, buf, &version)) { continue; } + + size_t len = strlen(path) + 1 + strlen(file) + 1; + char* buf = calloc(1, len); + sprintf(buf, "%s/%s", path, file); taosArrayPush(files, &buf); } closedir(dir); diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 77b6e02f18..9c92af26a2 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -28,7 +28,7 @@ #include "tutil.h" using namespace std; -#define NUM_OF_THREAD 10 +#define NUM_OF_THREAD 5 class DebugInfo { public: @@ -679,6 +679,17 @@ class IndexObj { } return numOfTable; } + int ReadMultiMillonData(const std::string& colName, const std::string& colVal = "Hello world", + size_t numOfTable = 100 * 10000) { + std::string tColVal = colVal; + + int colValSize = tColVal.size(); + for (int i = 0; i < numOfTable; i++) { + tColVal[i % colValSize] = 'a' + i % 26; + SearchOne(colName, tColVal); + } + return 0; + } int Put(SIndexMultiTerm* fvs, uint64_t uid) { numOfWrite += taosArrayGetSize(fvs); @@ -701,7 +712,8 @@ class IndexObj { int64_t s = taosGetTimestampUs(); if (Search(mq, result) == 0) { int64_t e = taosGetTimestampUs(); - std::cout << "search one successfully and time cost:" << e - s << std::endl; + std::cout << "search and time cost:" << e - s << "\tquery col:" << colName << "\t val: " << colVal + << "\t size:" << taosArrayGetSize(result) << std::endl; } else { } int sz = taosArrayGetSize(result); @@ -710,6 +722,31 @@ class IndexObj { return sz; // assert(taosArrayGetSize(result) == targetSize); } + int SearchOneTarget(const std::string& colName, const std::string& colVal, uint64_t val) { + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + indexMultiTermQueryAdd(mq, term, QUERY_TERM); + + SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); + + int64_t s = taosGetTimestampUs(); + if (Search(mq, result) == 0) { + int64_t e = taosGetTimestampUs(); + std::cout << "search one successfully and time cost:" << e - s << "\tquery col:" << colName + << "\t val: " << colVal << "\t size:" << taosArrayGetSize(result) << std::endl; + } else { + } + int sz = taosArrayGetSize(result); + indexMultiTermQueryDestroy(mq); + taosArrayDestroy(result); + assert(sz == 1); + uint64_t* ret = (uint64_t*)taosArrayGet(result, 0); + assert(val = *ret); + + return sz; + } + void PutOne(const std::string& colName, const std::string& colVal) { SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), @@ -718,6 +755,14 @@ class IndexObj { Put(terms, 10); indexMultiTermDestroy(terms); } + void PutOneTarge(const std::string& colName, const std::string& colVal, uint64_t val) { + SIndexMultiTerm* terms = indexMultiTermCreate(); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + indexMultiTermAdd(terms, term); + Put(terms, val); + indexMultiTermDestroy(terms); + } void Debug() { std::cout << "numOfWrite:" << numOfWrite << std::endl; std::cout << "numOfRead:" << numOfRead << std::endl; @@ -830,15 +875,15 @@ TEST_F(IndexEnv2, testIndex_TrigeFlush) { assert(numOfTable == target); } -static void write_and_search(IndexObj* idx) { - std::string colName("tag1"), colVal("Hello"); - +static void single_write_and_search(IndexObj* idx) { int target = idx->SearchOne("tag1", "Hello"); - std::cout << "search: " << target << std::endl; target = idx->SearchOne("tag2", "Test"); - std::cout << "search: " << target << std::endl; - - idx->PutOne(colName, colVal); +} +static void multi_write_and_search(IndexObj* idx) { + int target = idx->SearchOne("tag1", "Hello"); + target = idx->SearchOne("tag2", "Test"); + idx->WriteMultiMillonData("tag1", "Hello", 100 * 10000); + idx->WriteMultiMillonData("tag2", "Test", 100 * 10000); } TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { std::string path = "/tmp/cache_and_tfile"; @@ -847,13 +892,27 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { } index->PutOne("tag1", "Hello"); index->PutOne("tag2", "Test"); - index->WriteMultiMillonData("tag1", "Hello", 50 * 10000); - index->WriteMultiMillonData("tag2", "Test", 50 * 10000); + index->WriteMultiMillonData("tag1", "Hello", 100 * 10000); + index->WriteMultiMillonData("tag2", "Test", 100 * 10000); std::thread threads[NUM_OF_THREAD]; for (int i = 0; i < NUM_OF_THREAD; i++) { // - threads[i] = std::thread(write_and_search, index); + threads[i] = std::thread(single_write_and_search, index); + } + for (int i = 0; i < NUM_OF_THREAD; i++) { + // TOD + threads[i].join(); + } +} +TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) { + std::string path = "/tmp/cache_and_tfile"; + if (index->Init(path) != 0) {} + + std::thread threads[NUM_OF_THREAD]; + for (int i = 0; i < NUM_OF_THREAD; i++) { + // + threads[i] = std::thread(multi_write_and_search, index); } for (int i = 0; i < NUM_OF_THREAD; i++) { // TOD @@ -862,15 +921,33 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { } TEST_F(IndexEnv2, testIndex_restart) { - std::string path = "/tmp/test1"; + std::string path = "/tmp/cache_and_tfile"; if (index->Init(path) != 0) {} + index->SearchOneTarget("tag1", "Hello", 10); + index->SearchOneTarget("tag2", "Test", 10); } -TEST_F(IndexEnv2, testIndex_performance) { - std::string path = "/tmp/test2"; +TEST_F(IndexEnv2, testIndex_read_performance) { + std::string path = "/tmp/cache_and_tfile"; if (index->Init(path) != 0) {} + index->PutOneTarge("tag1", "Hello", 12); + index->PutOneTarge("tag1", "Hello", 15); + index->ReadMultiMillonData("tag1", "Hello"); + std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; + assert(3 == index->SearchOne("tag1", "Hello")); } TEST_F(IndexEnv2, testIndexMultiTag) { - std::string path = "/tmp/test3"; + std::string path = "/tmp/multi_tag"; if (index->Init(path) != 0) {} + index->WriteMultiMillonData("tag1", "Hello", 100 * 10000); + index->WriteMultiMillonData("tag2", "Test", 100 * 10000); + index->WriteMultiMillonData("tag3", "Test", 100 * 10000); + index->WriteMultiMillonData("tag4", "Test", 100 * 10000); +} +TEST_F(IndexEnv2, testLongComVal) { + std::string path = "/tmp/long_colVal"; + if (index->Init(path) != 0) {} + // gen colVal by randstr + std::string randstr = "xxxxxxxxxxxxxxxxx"; + index->WriteMultiMillonData("tag1", randstr, 100 * 10000); } diff --git a/source/libs/parser/inc/dataBlockMgt.h b/source/libs/parser/inc/dataBlockMgt.h index 69dad4d9f4..cd84222c65 100644 --- a/source/libs/parser/inc/dataBlockMgt.h +++ b/source/libs/parser/inc/dataBlockMgt.h @@ -126,7 +126,7 @@ static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowTyp int32_t idx, int32_t *toffset) { int32_t schemaIdx = 0; if (IS_DATA_COL_ORDERED(spd)) { - schemaIdx = spd->boundedColumns[idx]; + schemaIdx = spd->boundedColumns[idx] - 1; if (isDataRowT(memRowType)) { *toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart } else { diff --git a/source/libs/parser/inc/insertParser.h b/source/libs/parser/inc/insertParser.h index b0191b155d..796bd9b429 100644 --- a/source/libs/parser/inc/insertParser.h +++ b/source/libs/parser/inc/insertParser.h @@ -22,7 +22,7 @@ extern "C" { #include "parser.h" -int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo); +int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo); #ifdef __cplusplus } diff --git a/source/libs/parser/inc/parserInt.h b/source/libs/parser/inc/parserInt.h index 346bd0cbe4..d1629a2a3e 100644 --- a/source/libs/parser/inc/parserInt.h +++ b/source/libs/parser/inc/parserInt.h @@ -70,7 +70,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ */ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); -SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); +SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); /** * Evaluate the numeric and timestamp arithmetic expression in the WHERE clause. diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 6b42a93b73..6007fc300c 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -326,6 +326,31 @@ typedef struct SVgroupTablesBatch { SVgroupInfo info; } SVgroupTablesBatch; +static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputTag, SKVRowBuilder* pKvRowBuilder, + SArray* pTagValList, int32_t tsPrecision, SMsgBuf* pMsgBuf) { + const char* msg1 = "illegal value or data overflow"; + int32_t code = TSDB_CODE_SUCCESS; + + for (int32_t i = 0; i < numOfInputTag; ++i) { + SSchema* pSchema = &pTagSchema[i]; + + char* endPtr = NULL; + char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0}; + + SKvParam param = {.builder = pKvRowBuilder, .schema = pSchema}; + + SToken* pItem = taosArrayGet(pTagValList, i); + code = parseValueToken(&endPtr, pItem, pSchema, tsPrecision, tmpTokenBuf, KvRowAppend, ¶m, pMsgBuf); + + if (code != TSDB_CODE_SUCCESS) { + tdDestroyKVRowBuilder(pKvRowBuilder); + return buildInvalidOperationMsg(pMsgBuf, msg1); + } + } + + return code; +} + int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) { const char* msg1 = "invalid table name"; const char* msg2 = "tags number not matched"; @@ -354,10 +379,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p } SArray* pValList = pCreateTableInfo->pTagVals; + size_t numOfInputTag = taosArrayGetSize(pValList); - size_t numOfInputTag = taosArrayGetSize(pValList); STableMeta* pSuperTableMeta = NULL; - code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); if (code != TSDB_CODE_SUCCESS) { return code; @@ -463,21 +487,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p return buildInvalidOperationMsg(pMsgBuf, msg2); } - for (int32_t i = 0; i < numOfInputTag; ++i) { - SSchema* pSchema = &pTagSchema[i]; - - char* endPtr = NULL; - char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0}; - - SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema}; - - SToken* pItem = taosArrayGet(pValList, i); - code = parseValueToken(&endPtr, pItem, pSchema, tinfo.precision, tmpTokenBuf, KvRowAppend, ¶m, pMsgBuf); - - if (code != TSDB_CODE_SUCCESS) { - tdDestroyKVRowBuilder(&kvRowBuilder); - return buildInvalidOperationMsg(pMsgBuf, msg4); - } + code = doParseSerializeTagValue(pTagSchema, numOfInputTag, &kvRowBuilder, pValList, tinfo.precision, pMsgBuf); + if (code != TSDB_CODE_SUCCESS) { + return code; } } @@ -499,9 +511,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); struct SVCreateTbReq req = {0}; - req.type = TD_CHILD_TABLE; - req.name = strdup(tNameGetTableName(&tableName)); - req.ctbCfg.suid = pSuperTableMeta->suid; + req.type = TD_CHILD_TABLE; + req.name = strdup(tNameGetTableName(&tableName)); + req.ctbCfg.suid = pSuperTableMeta->uid; req.ctbCfg.pTag = row; SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId)); @@ -548,11 +560,12 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p taosArrayPush(pBufArray, &pVgData); } while (true); - SInsertStmtInfo* pStmtInfo = calloc(1, sizeof(SInsertStmtInfo)); - pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE; + SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo)); + pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE; pStmtInfo->pDataBlocks = pBufArray; - *pOutput = pStmtInfo; - *len = sizeof(SInsertStmtInfo); + + *pOutput = (char*) pStmtInfo; + *len = sizeof(SVnodeModifOpStmtInfo); return TSDB_CODE_SUCCESS; } @@ -823,14 +836,14 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c return NULL; } -SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) { +SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) { SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; assert(pCreateTable->type == TSQL_CREATE_CTABLE); SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf* pMsgBuf = &m; - SInsertStmtInfo* pInsertStmt = NULL; + SVnodeModifOpStmtInfo* pInsertStmt = NULL; int32_t msgLen = 0; int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen); diff --git a/source/libs/parser/src/dataBlockMgt.c b/source/libs/parser/src/dataBlockMgt.c index b315ea613f..bc4eae7ae9 100644 --- a/source/libs/parser/src/dataBlockMgt.c +++ b/source/libs/parser/src/dataBlockMgt.c @@ -467,7 +467,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB } int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks) { - const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); + const int INSERT_HEAD_SIZE = sizeof(SSubmitMsg); int code = 0; bool isRawPayload = IS_RAW_PAYLOAD(payloadType); SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index 66966f75db..8b3c328cce 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -61,7 +61,7 @@ typedef struct SInsertParseContext { SArray* pTableDataBlocks; // global SArray* pVgDataBlocks; // global int32_t totalNum; - SInsertStmtInfo* pOutput; + SVnodeModifOpStmtInfo* pOutput; } SInsertParseContext; static int32_t skipInsertInto(SInsertParseContext* pCxt) { @@ -106,6 +106,7 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { SVgroupInfo vg; CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg)); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); + pCxt->pTableMeta->vgId = vg.vgId; // todo remove return TSDB_CODE_SUCCESS; } @@ -120,11 +121,9 @@ static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pS } static void buildMsgHeader(SVgDataBlocks* blocks) { - SMsgDesc* desc = (SMsgDesc*)blocks->pData; - desc->numOfVnodes = htonl(1); - SSubmitMsg* submit = (SSubmitMsg*)(desc + 1); + SSubmitMsg* submit = (SSubmitMsg*)blocks->pData; submit->header.vgId = htonl(blocks->vg.vgId); - submit->header.contLen = htonl(blocks->size - sizeof(SMsgDesc)); + submit->header.contLen = htonl(blocks->size); submit->length = submit->header.contLen; submit->numOfBlocks = htonl(blocks->numOfTables); SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); @@ -425,7 +424,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, // 1. set the parsed value from sql string for (int i = 0; i < spd->numOfBound; ++i) { NEXT_TOKEN(pCxt->pSql, sToken); - SSchema *pSchema = &schema[spd->boundedColumns[i]]; + SSchema *pSchema = &schema[spd->boundedColumns[i] - 1]; param.schema = pSchema; param.compareStat = pBuilder->compareStat; getMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, ¶m.toffset); @@ -611,7 +610,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { // [(field1_name, ...)] // VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path // [...]; -int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) { +int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) { SInsertParseContext context = { .pComCxt = pContext, .pSql = (char*) pContext->pSql, @@ -620,7 +619,7 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) { .pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false), .pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false), .totalNum = 0, - .pOutput = calloc(1, sizeof(SInsertStmtInfo)) + .pOutput = calloc(1, sizeof(SVnodeModifOpStmtInfo)) }; if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) { diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 1b4d05808c..9455d23a1c 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -53,7 +53,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { } if (toVnode) { - SInsertStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen); + SVnodeModifOpStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen); if (pInsertInfo == NULL) { return terrno; } @@ -87,7 +87,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) { - return parseInsertSql(pCxt, (SInsertStmtInfo**)pQuery); + return parseInsertSql(pCxt, (SVnodeModifOpStmtInfo**)pQuery); } else { return parseQuerySql(pCxt, pQuery); } diff --git a/source/libs/parser/test/insertParserTest.cpp b/source/libs/parser/test/insertParserTest.cpp index 9e681c7ccb..9b007fb36d 100644 --- a/source/libs/parser/test/insertParserTest.cpp +++ b/source/libs/parser/test/insertParserTest.cpp @@ -60,7 +60,7 @@ protected: return code_; } - SInsertStmtInfo* reslut() { + SVnodeModifOpStmtInfo* reslut() { return res_; } @@ -70,9 +70,7 @@ protected: for (size_t i = 0; i < num; ++i) { SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i); cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl; - SMsgDesc* desc = (SMsgDesc*)(vg->pData); - cout << "numOfVnodes:" << ntohl(desc->numOfVnodes) << endl; - SSubmitMsg* submit = (SSubmitMsg*)(desc + 1); + SSubmitMsg* submit = (SSubmitMsg*)vg->pData; cout << "length:" << ntohl(submit->length) << ", numOfBlocks:" << ntohl(submit->numOfBlocks) << endl; int32_t numOfBlocks = ntohl(submit->numOfBlocks); SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); @@ -95,9 +93,7 @@ protected: SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i); ASSERT_EQ(vg->numOfTables, numOfTables); ASSERT_GE(vg->size, 0); - SMsgDesc* desc = (SMsgDesc*)(vg->pData); - ASSERT_EQ(ntohl(desc->numOfVnodes), 1); - SSubmitMsg* submit = (SSubmitMsg*)(desc + 1); + SSubmitMsg* submit = (SSubmitMsg*)vg->pData; ASSERT_GE(ntohl(submit->length), 0); ASSERT_GE(ntohl(submit->numOfBlocks), 0); int32_t numOfBlocks = ntohl(submit->numOfBlocks); @@ -128,7 +124,7 @@ private: char sqlBuf_[max_sql_len]; SParseContext cxt_; int32_t code_; - SInsertStmtInfo* res_; + SVnodeModifOpStmtInfo* res_; }; // INSERT INTO tb_name VALUES (field1_value, ...) diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 3be358fec8..2a0f6b38eb 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -27,8 +27,8 @@ std::unique_ptr mockCatalogService; class TableBuilder : public ITableBuilder { public: virtual TableBuilder& addColumn(const std::string& name, int8_t type, int32_t bytes) { - assert(colId_ < schema()->tableInfo.numOfTags + schema()->tableInfo.numOfColumns); - SSchema* col = schema()->schema + colId_; + assert(colId_ <= schema()->tableInfo.numOfTags + schema()->tableInfo.numOfColumns); + SSchema* col = schema()->schema + (colId_ - 1); col->type = type; col->colId = colId_++; col->bytes = bytes; @@ -66,7 +66,7 @@ private: return std::unique_ptr(new TableBuilder(meta)); } - TableBuilder(STableMeta* schemaMeta) : colId_(0), rowsize_(0), meta_(new MockTableMeta()) { + TableBuilder(STableMeta* schemaMeta) : colId_(1), rowsize_(0), meta_(new MockTableMeta()) { meta_->schema.reset(schemaMeta); } diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index 2de0ae2da3..d04fd716c2 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -38,7 +38,7 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { } static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) { - SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode; + SVnodeModifOpStmtInfo* pInsert = (SVnodeModifOpStmtInfo*)pNode; *pQueryPlan = calloc(1, sizeof(SQueryPlanNode)); SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES); diff --git a/source/libs/planner/test/phyPlanTests.cpp b/source/libs/planner/test/phyPlanTests.cpp index b6f9612653..3f6f5ab77b 100644 --- a/source/libs/planner/test/phyPlanTests.cpp +++ b/source/libs/planner/test/phyPlanTests.cpp @@ -33,7 +33,7 @@ protected: void pushScan(const string& db, const string& table, int32_t scanOp) { shared_ptr meta = mockCatalogService->getTableMeta(db, table); EXPECT_TRUE(meta); - unique_ptr scan((SQueryPlanNode*)calloc(1, sizeof(SQueryPlanNode))); + unique_ptr scan((SQueryPlanNode*)myCalloc(1, sizeof(SQueryPlanNode))); scan->info.type = scanOp; scan->numOfCols = meta->schema->tableInfo.numOfColumns; scan->pSchema = (SSchema*)myCalloc(1, sizeof(SSchema) * scan->numOfCols); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index b50eb2c92d..507650159f 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -43,15 +43,12 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 bMsg->header.vgId = htonl(bInput->vgId); if (bInput->dbName) { - strncpy(bMsg->dbFname, bInput->dbName, sizeof(bMsg->dbFname)); - bMsg->dbFname[sizeof(bMsg->dbFname) - 1] = 0; + tstrncpy(bMsg->dbFname, bInput->dbName, tListLen(bMsg->dbFname)); } - strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); - bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; + tstrncpy(bMsg->tableFname, bInput->tableFullName, tListLen(bMsg->tableFname)); *msgLen = (int32_t)sizeof(*bMsg); - return TSDB_CODE_SUCCESS; } @@ -211,7 +208,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl pTableMeta->vgId = isSuperTable ? 0 : msg->vgId; pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType; - pTableMeta->uid = msg->suid; + pTableMeta->uid = msg->tuid; pTableMeta->suid = msg->suid; pTableMeta->sversion = msg->sversion; pTableMeta->tversion = msg->tversion; @@ -250,8 +247,8 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { pOut->metaNum = 2; if (pMetaMsg->dbFname[0]) { - snprintf(pOut->ctbFname, "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname); - snprintf(pOut->tbFname, "%s.%s", pMetaMsg->dbFname, pMetaMsg->stbFname); + snprintf(pOut->ctbFname, sizeof(pOut->ctbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname); + snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->stbFname); } else { memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname)); memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname)); @@ -272,7 +269,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname)); } - code = queryCreateTableMetaFromMsg(pMetaMsg, false, &pOut->tbMeta); + code = queryCreateTableMetaFromMsg(pMetaMsg, (pMetaMsg->tableType == TSDB_SUPER_TABLE), &pOut->tbMeta); } return code; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 22556f742b..9079912c40 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -372,7 +372,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); if (!moved) { - SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); + SCH_TASK_ERR_LOG(" task may already moved, status:%d", task->status); return TSDB_CODE_SUCCESS; } @@ -480,7 +480,6 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; - switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: { if (rspCode != TSDB_CODE_SUCCESS) { @@ -492,6 +491,8 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms goto _task_error; } } + + break; } case TDMT_VND_SUBMIT_RSP: { if (rspCode != TSDB_CODE_SUCCESS) { diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index c816cb45a3..d58ea63c4f 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -1096,13 +1096,16 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { SRpcReqContext *pContext; pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); + char ipstr[24] = {0}; + taosIpPort2String(pRecv->ip, pRecv->port, ipstr); + if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) { - tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, - pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), pRecv->ip, pRecv->port, terrno, pRecv->msgLen, + tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, + pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); } else { - tDebug("%s %p %p, %d received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, - pConn, (void *)pHead->ahandle, pHead->msgType, pRecv->ip, pRecv->port, terrno, pRecv->msgLen, + tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, + pConn, (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); } diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index df006f44eb..e235b0714e 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -252,7 +252,7 @@ LONG WINAPI FlCrashDump(PEXCEPTION_POINTERS ep) { void taosSetCoreDump() { SetUnhandledExceptionFilter(&FlCrashDump); } -int32_t taosGetSystemUid(char *uid, int32_t uidlen) { +int32_t taosGetSystemUUID(char *uid, int32_t uidlen) { GUID guid; CoCreateGuid(&guid); @@ -452,7 +452,7 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { } } -int32_t taosGetSystemUid(char *uid, int32_t uidlen) { +int32_t taosGetSystemUUID(char *uid, int32_t uidlen) { uuid_t uuid = {0}; uuid_generate(uuid); // it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null @@ -1070,7 +1070,7 @@ void taosSetCoreDump(bool enable) { #endif } -int32_t taosGetSystemUid(char *uid, int32_t uidlen) { +int32_t taosGetSystemUUID(char *uid, int32_t uidlen) { int fd; int len = 0; diff --git a/source/util/src/tfile.c b/source/util/src/tfile.c index 4cb20802c7..0f68e9204d 100644 --- a/source/util/src/tfile.c +++ b/source/util/src/tfile.c @@ -158,3 +158,13 @@ int32_t tfFtruncate(int64_t tfd, int64_t length) { taosReleaseRef(tsFileRsetId, tfd); return code; } + +void *tfMmapReadOnly(int64_t tfd, int64_t length) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return NULL; + int32_t fd = (int32_t)(uintptr_t)p; + + void *ptr = mmap(NULL, length, PROT_READ, MAP_SHARED, fd, 0); + taosReleaseRef(tsFileRsetId, tfd); + return ptr; +} diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 2841f27da4..f90b157558 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -28,13 +28,13 @@ tfree(_n); \ } while (0) -#define FREE_HASH_NODE(_h, _n) \ - do { \ - if ((_h)->freeFp) { \ +#define FREE_HASH_NODE(_h, _n) \ + do { \ + if ((_h)->freeFp) { \ (_h)->freeFp(GET_HASH_NODE_DATA(_n)); \ - } \ - \ - DO_FREE_HASH_NODE(_n); \ + } \ + \ + DO_FREE_HASH_NODE(_n); \ } while (0); static FORCE_INLINE void __wr_lock(void *lock, int32_t type) { @@ -55,7 +55,6 @@ static FORCE_INLINE void __rd_unlock(void *lock, int32_t type) { if (type == HASH_NO_LOCK) { return; } - taosRUnLockLatch(lock); } @@ -63,7 +62,6 @@ static FORCE_INLINE void __wr_unlock(void *lock, int32_t type) { if (type == HASH_NO_LOCK) { return; } - taosWUnLockLatch(lock); } @@ -225,12 +223,12 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da // need the resize process, write lock applied if (HASH_NEED_RESIZE(pHashObj)) { - __wr_lock(&pHashObj->lock, pHashObj->type); + __wr_lock((void*) &pHashObj->lock, pHashObj->type); taosHashTableResize(pHashObj); - __wr_unlock(&pHashObj->lock, pHashObj->type); + __wr_unlock((void*) &pHashObj->lock, pHashObj->type); } - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; @@ -272,7 +270,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da } // enable resize - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); atomic_add_fetch_32(&pHashObj->size, 1); return 0; @@ -289,7 +287,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da } // enable resize - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return pHashObj->enableUpdate ? 0 : -2; } @@ -308,14 +306,14 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); // only add the read lock to disable the resize process - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; // no data, return directly if (atomic_load_32(&pe->num) == 0) { - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return NULL; } @@ -358,7 +356,7 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo taosRUnLockLatch(&pe->latch); } - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return data; } @@ -370,14 +368,14 @@ void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); // only add the read lock to disable the resize process - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; // no data, return directly if (atomic_load_32(&pe->num) == 0) { - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return NULL; } @@ -415,7 +413,7 @@ void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v taosRUnLockLatch(&pe->latch); } - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return data; } @@ -436,7 +434,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); // disable the resize process - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; @@ -450,7 +448,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi assert(pe->next == NULL); taosWUnLockLatch(&pe->latch); - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return -1; } @@ -491,7 +489,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi taosWUnLockLatch(&pe->latch); } - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return code; } @@ -502,7 +500,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi } // disable the resize process - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); int32_t numOfEntries = (int32_t)pHashObj->capacity; for (int32_t i = 0; i < numOfEntries; ++i) { @@ -566,7 +564,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi } } - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return 0; } @@ -577,7 +575,7 @@ void taosHashClear(SHashObj *pHashObj) { SHashNode *pNode, *pNext; - __wr_lock(&pHashObj->lock, pHashObj->type); + __wr_lock((void*) &pHashObj->lock, pHashObj->type); for (int32_t i = 0; i < pHashObj->capacity; ++i) { SHashEntry *pEntry = pHashObj->hashList[i]; @@ -601,7 +599,7 @@ void taosHashClear(SHashObj *pHashObj) { } atomic_store_32(&pHashObj->size, 0); - __wr_unlock(&pHashObj->lock, pHashObj->type); + __wr_unlock((void*) &pHashObj->lock, pHashObj->type); } void taosHashCleanup(SHashObj *pHashObj) { @@ -864,7 +862,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { char *data = NULL; // only add the read lock to disable the resize process - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); SHashNode *pNode = NULL; if (p) { @@ -911,7 +909,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { } } - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return data; } @@ -920,7 +918,7 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) { if (pHashObj == NULL || p == NULL) return; // only add the read lock to disable the resize process - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock((void*) &pHashObj->lock, pHashObj->type); int slot; taosHashReleaseNode(pHashObj, p, &slot); @@ -930,7 +928,7 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) { taosWUnLockLatch(&pe->latch); } - __rd_unlock(&pHashObj->lock, pHashObj->type); + __rd_unlock((void*) &pHashObj->lock, pHashObj->type); } void taosHashRelease(SHashObj *pHashObj, void *p) {