Merge remote-tracking branch 'origin/3.0' into feature/3.0_wxy
This commit is contained in:
commit
eb9e73c089
|
@ -102,25 +102,24 @@ typedef enum {
|
||||||
} ESdbStatus;
|
} ESdbStatus;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
SDB_START = 0,
|
SDB_TRANS = 0,
|
||||||
SDB_TRANS = 1,
|
SDB_CLUSTER = 1,
|
||||||
SDB_CLUSTER = 2,
|
SDB_MNODE = 2,
|
||||||
SDB_MNODE = 3,
|
SDB_QNODE = 3,
|
||||||
SDB_QNODE = 4,
|
SDB_SNODE = 4,
|
||||||
SDB_SNODE = 5,
|
SDB_BNODE = 5,
|
||||||
SDB_BNODE = 6,
|
SDB_DNODE = 6,
|
||||||
SDB_DNODE = 7,
|
SDB_USER = 7,
|
||||||
SDB_USER = 8,
|
SDB_AUTH = 8,
|
||||||
SDB_AUTH = 9,
|
SDB_ACCT = 9,
|
||||||
SDB_ACCT = 10,
|
SDB_CONSUMER = 10,
|
||||||
SDB_CONSUMER = 11,
|
SDB_CGROUP = 11,
|
||||||
SDB_CGROUP = 12,
|
SDB_TOPIC = 12,
|
||||||
SDB_TOPIC = 13,
|
SDB_VGROUP = 13,
|
||||||
SDB_VGROUP = 14,
|
SDB_STB = 14,
|
||||||
SDB_STB = 15,
|
SDB_DB = 15,
|
||||||
SDB_DB = 16,
|
SDB_FUNC = 16,
|
||||||
SDB_FUNC = 17,
|
SDB_MAX = 17
|
||||||
SDB_MAX = 18
|
|
||||||
} ESdbType;
|
} ESdbType;
|
||||||
|
|
||||||
typedef struct SSdb SSdb;
|
typedef struct SSdb SSdb;
|
||||||
|
@ -188,6 +187,14 @@ int32_t sdbDeploy(SSdb *pSdb);
|
||||||
*/
|
*/
|
||||||
int32_t sdbReadFile(SSdb *pSdb);
|
int32_t sdbReadFile(SSdb *pSdb);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Write sdb file.
|
||||||
|
*
|
||||||
|
* @param pSdb The sdb object.
|
||||||
|
* @return int32_t 0 for success, -1 for failure.
|
||||||
|
*/
|
||||||
|
int32_t sdbWriteFile(SSdb *pSdb);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Parse and write raw data to sdb, then free the pRaw object
|
* @brief Parse and write raw data to sdb, then free the pRaw object
|
||||||
*
|
*
|
||||||
|
@ -260,7 +267,7 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2
|
||||||
*
|
*
|
||||||
* @param pSdb The sdb object.
|
* @param pSdb The sdb object.
|
||||||
* @param pIter The type of the table.
|
* @param pIter The type of the table.
|
||||||
* @record int32_t The number of rows in the table
|
* @return int32_t The number of rows in the table
|
||||||
*/
|
*/
|
||||||
int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
|
int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
|
||||||
|
|
||||||
|
@ -269,10 +276,19 @@ int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
|
||||||
*
|
*
|
||||||
* @param pSdb The sdb object.
|
* @param pSdb The sdb object.
|
||||||
* @param pIter The type of the table.
|
* @param pIter The type of the table.
|
||||||
* @record int32_t The max id of the table
|
* @return int32_t The max id of the table
|
||||||
*/
|
*/
|
||||||
int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type);
|
int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Update the version of sdb
|
||||||
|
*
|
||||||
|
* @param pSdb The sdb object.
|
||||||
|
* @param val The update value of the version.
|
||||||
|
* @return int32_t The current version of sdb
|
||||||
|
*/
|
||||||
|
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val);
|
||||||
|
|
||||||
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
|
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
|
||||||
void sdbFreeRaw(SSdbRaw *pRaw);
|
void sdbFreeRaw(SSdbRaw *pRaw);
|
||||||
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val);
|
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val);
|
||||||
|
|
|
@ -90,12 +90,12 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, cons
|
||||||
/**
|
/**
|
||||||
* Force renew a table's local cached meta data.
|
* Force renew a table's local cached meta data.
|
||||||
* @param pCatalog (input, got with catalogGetHandle)
|
* @param pCatalog (input, got with catalogGetHandle)
|
||||||
* @param pRpc (input, rpc object)
|
* @param pTransporter (input, rpc object)
|
||||||
* @param pMgmtEps (input, mnode EPs)
|
* @param pMgmtEps (input, mnode EPs)
|
||||||
* @param pTableName (input, table name, NOT including db name)
|
* @param pTableName (input, table name, NOT including db name)
|
||||||
* @return error code
|
* @return error code
|
||||||
*/
|
*/
|
||||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName);
|
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Force renew a table's local cached meta data and get the new one.
|
* Force renew a table's local cached meta data and get the new one.
|
||||||
|
|
|
@ -154,14 +154,14 @@ typedef struct SVgDataBlocks {
|
||||||
char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
|
char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
|
||||||
} SVgDataBlocks;
|
} SVgDataBlocks;
|
||||||
|
|
||||||
typedef struct SInsertStmtInfo {
|
typedef struct SVnodeModifOpStmtInfo {
|
||||||
int16_t nodeType;
|
int16_t nodeType;
|
||||||
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
|
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
|
||||||
int8_t schemaAttache; // denote if submit block is built with table schema or not
|
int8_t schemaAttache; // denote if submit block is built with table schema or not
|
||||||
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
|
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
|
||||||
uint32_t insertType; // insert data from [file|sql statement| bound statement]
|
uint32_t insertType; // insert data from [file|sql statement| bound statement]
|
||||||
const char* sql; // current sql statement position
|
const char* sql; // current sql statement position
|
||||||
} SInsertStmtInfo;
|
} SVnodeModifOpStmtInfo;
|
||||||
|
|
||||||
typedef struct SDclStmtInfo {
|
typedef struct SDclStmtInfo {
|
||||||
int16_t nodeType;
|
int16_t nodeType;
|
||||||
|
|
|
@ -160,6 +160,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0339)
|
#define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0339)
|
||||||
#define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x033A)
|
#define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x033A)
|
||||||
#define TSDB_CODE_SDB_INVALID_DATA_CONTENT TAOS_DEF_ERROR_CODE(0, 0x033B)
|
#define TSDB_CODE_SDB_INVALID_DATA_CONTENT TAOS_DEF_ERROR_CODE(0, 0x033B)
|
||||||
|
#define TSDB_CODE_SDB_INVALID_WAl_VER TAOS_DEF_ERROR_CODE(0, 0x033C)
|
||||||
|
|
||||||
// mnode-dnode
|
// mnode-dnode
|
||||||
#define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340)
|
#define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340)
|
||||||
|
|
|
@ -24,7 +24,7 @@ extern "C" {
|
||||||
#include "tlockfree.h"
|
#include "tlockfree.h"
|
||||||
|
|
||||||
typedef uint32_t (*_hash_fn_t)(const char *, uint32_t);
|
typedef uint32_t (*_hash_fn_t)(const char *, uint32_t);
|
||||||
typedef int32_t (*_equal_fn_t)(const void*, const void*, uint32_t len);
|
typedef int32_t (*_equal_fn_t)(const void*, const void*, size_t len);
|
||||||
typedef void (*_hash_before_fn_t)(void *);
|
typedef void (*_hash_before_fn_t)(void *);
|
||||||
typedef void (*_hash_free_fn_t)(void *);
|
typedef void (*_hash_free_fn_t)(void *);
|
||||||
|
|
||||||
|
|
|
@ -53,8 +53,8 @@ static void registerRequest(SRequestObj* pRequest) {
|
||||||
|
|
||||||
int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1);
|
int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1);
|
||||||
int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1);
|
int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1);
|
||||||
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self,
|
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%"PRIx64, pRequest->self,
|
||||||
pRequest->pTscObj->id, num, currentInst, total);
|
pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -371,7 +371,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
|
||||||
taos_close(pTscObj);
|
taos_close(pTscObj);
|
||||||
pTscObj = NULL;
|
pTscObj = NULL;
|
||||||
} else {
|
} else {
|
||||||
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
|
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pTransporter, pRequest->requestId);
|
||||||
destroyRequest(pRequest);
|
destroyRequest(pRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -441,14 +441,13 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||||
* There is not response callback function for submit response.
|
* There is not response callback function for submit response.
|
||||||
* The actual inserted number of points is the first number.
|
* The actual inserted number of points is the first number.
|
||||||
*/
|
*/
|
||||||
|
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
|
||||||
if (pMsg->code == TSDB_CODE_SUCCESS) {
|
if (pMsg->code == TSDB_CODE_SUCCESS) {
|
||||||
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%" PRId64 " ms", pRequest->requestId,
|
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->requestId,
|
||||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen,
|
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
|
||||||
pRequest->metric.rsp - pRequest->metric.start);
|
|
||||||
} else {
|
} else {
|
||||||
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%" PRId64 " ms", pRequest->requestId,
|
tscError("reqId:0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x"PRIx64, pRequest->requestId,
|
||||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen,
|
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
|
||||||
pRequest->metric.rsp - pRequest->metric.start);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
|
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
|
||||||
|
|
|
@ -57,95 +57,95 @@ TEST(testCase, connect_Test) {
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
//TEST(testCase, create_user_Test) {
|
TEST(testCase, create_user_Test) {
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
// assert(pConn != NULL);
|
assert(pConn != NULL);
|
||||||
//
|
|
||||||
// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
|
|
||||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
|
||||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// taos_free_result(pRes);
|
|
||||||
// taos_close(pConn);
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//TEST(testCase, create_account_Test) {
|
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
|
||||||
// assert(pConn != NULL);
|
|
||||||
//
|
|
||||||
// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
|
|
||||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
|
||||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// taos_free_result(pRes);
|
|
||||||
// taos_close(pConn);
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//TEST(testCase, drop_account_Test) {
|
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
|
||||||
// assert(pConn != NULL);
|
|
||||||
//
|
|
||||||
// TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
|
|
||||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
|
||||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// taos_free_result(pRes);
|
|
||||||
// taos_close(pConn);
|
|
||||||
//}
|
|
||||||
|
|
||||||
//TEST(testCase, show_user_Test) {
|
TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||||
// assert(pConn != NULL);
|
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||||
//
|
}
|
||||||
// TAOS_RES* pRes = taos_query(pConn, "show users");
|
|
||||||
// TAOS_ROW pRow = NULL;
|
|
||||||
//
|
|
||||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
|
||||||
// int32_t numOfFields = taos_num_fields(pRes);
|
|
||||||
//
|
|
||||||
// char str[512] = {0};
|
|
||||||
// while((pRow = taos_fetch_row(pRes)) != NULL) {
|
|
||||||
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
|
||||||
// printf("%s\n", str);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// taos_close(pConn);
|
|
||||||
//}
|
|
||||||
|
|
||||||
//TEST(testCase, drop_user_Test) {
|
taos_free_result(pRes);
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
taos_close(pConn);
|
||||||
// assert(pConn != NULL);
|
}
|
||||||
//
|
|
||||||
// TAOS_RES* pRes = taos_query(pConn, "drop user abc");
|
|
||||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
|
||||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// taos_free_result(pRes);
|
|
||||||
// taos_close(pConn);
|
|
||||||
//}
|
|
||||||
|
|
||||||
//TEST(testCase, show_db_Test) {
|
TEST(testCase, create_account_Test) {
|
||||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
//// assert(pConn != NULL);
|
assert(pConn != NULL);
|
||||||
//
|
|
||||||
// TAOS_RES* pRes = taos_query(pConn, "show databases");
|
TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
|
||||||
// TAOS_ROW pRow = NULL;
|
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||||
//
|
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
}
|
||||||
// int32_t numOfFields = taos_num_fields(pRes);
|
|
||||||
//
|
taos_free_result(pRes);
|
||||||
// char str[512] = {0};
|
taos_close(pConn);
|
||||||
// while((pRow = taos_fetch_row(pRes)) != NULL) {
|
}
|
||||||
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
|
||||||
// printf("%s\n", str);
|
TEST(testCase, drop_account_Test) {
|
||||||
// }
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
//
|
assert(pConn != NULL);
|
||||||
// taos_close(pConn);
|
|
||||||
//}
|
TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
|
||||||
|
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||||
|
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(pRes);
|
||||||
|
taos_close(pConn);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(testCase, show_user_Test) {
|
||||||
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
assert(pConn != NULL);
|
||||||
|
|
||||||
|
TAOS_RES* pRes = taos_query(pConn, "show users");
|
||||||
|
TAOS_ROW pRow = NULL;
|
||||||
|
|
||||||
|
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||||
|
int32_t numOfFields = taos_num_fields(pRes);
|
||||||
|
|
||||||
|
char str[512] = {0};
|
||||||
|
while((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||||
|
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||||
|
printf("%s\n", str);
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_close(pConn);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(testCase, drop_user_Test) {
|
||||||
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
assert(pConn != NULL);
|
||||||
|
|
||||||
|
TAOS_RES* pRes = taos_query(pConn, "drop user abc");
|
||||||
|
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||||
|
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(pRes);
|
||||||
|
taos_close(pConn);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(testCase, show_db_Test) {
|
||||||
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
// assert(pConn != NULL);
|
||||||
|
|
||||||
|
TAOS_RES* pRes = taos_query(pConn, "show databases");
|
||||||
|
TAOS_ROW pRow = NULL;
|
||||||
|
|
||||||
|
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||||
|
int32_t numOfFields = taos_num_fields(pRes);
|
||||||
|
|
||||||
|
char str[512] = {0};
|
||||||
|
while((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||||
|
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||||
|
printf("%s\n", str);
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_close(pConn);
|
||||||
|
}
|
||||||
|
|
||||||
TEST(testCase, create_db_Test) {
|
TEST(testCase, create_db_Test) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
@ -500,22 +500,17 @@ TEST(testCase, create_multiple_tables) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(testCase, generated_request_id_test) {
|
TEST(testCase, generated_request_id_test) {
|
||||||
uint64_t id0 = generateRequestId();
|
SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
uint64_t id1 = generateRequestId();
|
for(int32_t i = 0; i < 1000000; ++i) {
|
||||||
uint64_t id2 = generateRequestId();
|
uint64_t v = generateRequestId();
|
||||||
uint64_t id3 = generateRequestId();
|
void* result = taosHashGet(phash, &v, sizeof(v));
|
||||||
uint64_t id4 = generateRequestId();
|
ASSERT_EQ(result, nullptr);
|
||||||
|
|
||||||
ASSERT_NE(id0, id1);
|
taosHashPut(phash, &v, sizeof(v), NULL, 0);
|
||||||
ASSERT_NE(id1, id2);
|
}
|
||||||
ASSERT_NE(id2, id3);
|
|
||||||
ASSERT_NE(id4, id3);
|
|
||||||
ASSERT_NE(id0, id2);
|
|
||||||
ASSERT_NE(id0, id4);
|
|
||||||
ASSERT_NE(id0, id3);
|
|
||||||
|
|
||||||
// SHashObj *phash = taosHashInit()
|
taosHashClear(phash);
|
||||||
}
|
}
|
||||||
|
|
||||||
//TEST(testCase, projection_query_tables) {
|
//TEST(testCase, projection_query_tables) {
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
enable_testing()
|
enable_testing()
|
||||||
|
|
||||||
add_subdirectory(acct)
|
|
||||||
# add_subdirectory(auth)
|
# add_subdirectory(auth)
|
||||||
# add_subdirectory(balance)
|
# add_subdirectory(balance)
|
||||||
add_subdirectory(cluster)
|
add_subdirectory(cluster)
|
||||||
|
@ -17,7 +16,6 @@ add_subdirectory(stb)
|
||||||
# add_subdirectory(sync)
|
# add_subdirectory(sync)
|
||||||
# add_subdirectory(telem)
|
# add_subdirectory(telem)
|
||||||
# add_subdirectory(trans)
|
# add_subdirectory(trans)
|
||||||
add_subdirectory(user)
|
|
||||||
add_subdirectory(vgroup)
|
add_subdirectory(vgroup)
|
||||||
|
|
||||||
add_subdirectory(sut)
|
add_subdirectory(sut)
|
||||||
|
|
|
@ -1,11 +0,0 @@
|
||||||
aux_source_directory(. ACCT_SRC)
|
|
||||||
add_executable(dnode_test_acct ${ACCT_SRC})
|
|
||||||
target_link_libraries(
|
|
||||||
dnode_test_acct
|
|
||||||
PUBLIC sut
|
|
||||||
)
|
|
||||||
|
|
||||||
add_test(
|
|
||||||
NAME dnode_test_acct
|
|
||||||
COMMAND dnode_test_acct
|
|
||||||
)
|
|
|
@ -1,11 +0,0 @@
|
||||||
aux_source_directory(. USER_SRC)
|
|
||||||
add_executable(dnode_test_user ${USER_SRC})
|
|
||||||
target_link_libraries(
|
|
||||||
dnode_test_user
|
|
||||||
PUBLIC sut
|
|
||||||
)
|
|
||||||
|
|
||||||
add_test(
|
|
||||||
NAME dnode_test_user
|
|
||||||
COMMAND dnode_test_user
|
|
||||||
)
|
|
|
@ -8,7 +8,12 @@ target_include_directories(
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
mnode
|
mnode
|
||||||
PRIVATE sdb
|
PRIVATE sdb
|
||||||
|
PRIVATE wal
|
||||||
PRIVATE transport
|
PRIVATE transport
|
||||||
PRIVATE cjson
|
PRIVATE cjson
|
||||||
PRIVATE sync
|
PRIVATE sync
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if(${BUILD_TEST})
|
||||||
|
add_subdirectory(test)
|
||||||
|
endif(${BUILD_TEST})
|
|
@ -17,11 +17,13 @@
|
||||||
#define _TD_MND_INT_H_
|
#define _TD_MND_INT_H_
|
||||||
|
|
||||||
#include "mndDef.h"
|
#include "mndDef.h"
|
||||||
|
|
||||||
#include "sdb.h"
|
#include "sdb.h"
|
||||||
#include "tcache.h"
|
#include "tcache.h"
|
||||||
#include "tep.h"
|
#include "tep.h"
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
|
#include "wal.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -65,6 +67,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
sem_t syncSem;
|
sem_t syncSem;
|
||||||
|
SWal *pWal;
|
||||||
SSyncNode *pSyncNode;
|
SSyncNode *pSyncNode;
|
||||||
ESyncState state;
|
ESyncState state;
|
||||||
} SSyncMgmt;
|
} SSyncMgmt;
|
||||||
|
|
|
@ -354,7 +354,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStatus->dnodeId == 0) {
|
if (pStatus->dnodeId == 0) {
|
||||||
mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
|
mDebug("dnode:%d, %s first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
|
||||||
} else {
|
} else {
|
||||||
if (pStatus->clusterId != pMnode->clusterId) {
|
if (pStatus->clusterId != pMnode->clusterId) {
|
||||||
if (pDnode != NULL) {
|
if (pDnode != NULL) {
|
||||||
|
|
|
@ -152,7 +152,7 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = (*metaFp)(pMnodeMsg, pShow, &pRsp->tableMeta);
|
int32_t code = (*metaFp)(pMnodeMsg, pShow, &pRsp->tableMeta);
|
||||||
mDebug("show:0x%" PRIx64 ", get meta finished, numOfRows:%d cols:%d type:%s result:%s", pShow->id, pShow->numOfRows,
|
mDebug("show:0x%" PRIx64 ", get meta finished, numOfRows:%d cols:%d type:%s, result:%s", pShow->id, pShow->numOfRows,
|
||||||
pShow->numOfColumns, mndShowStr(type), tstrerror(code));
|
pShow->numOfColumns, mndShowStr(type), tstrerror(code));
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -15,11 +15,110 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndSync.h"
|
#include "mndSync.h"
|
||||||
|
#include "mndTrans.h"
|
||||||
|
|
||||||
|
static int32_t mndInitWal(SMnode *pMnode) {
|
||||||
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
|
|
||||||
|
char path[PATH_MAX] = {0};
|
||||||
|
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
|
||||||
|
SWalCfg cfg = {.vgId = 1,
|
||||||
|
.fsyncPeriod = 0,
|
||||||
|
.rollPeriod = -1,
|
||||||
|
.segSize = -1,
|
||||||
|
.retentionPeriod = -1,
|
||||||
|
.retentionSize = -1,
|
||||||
|
.level = TAOS_WAL_FSYNC};
|
||||||
|
pMgmt->pWal = walOpen(path, &cfg);
|
||||||
|
if (pMgmt->pWal == NULL) return -1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mndCloseWal(SMnode *pMnode) {
|
||||||
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
|
if (pMgmt->pWal != NULL) {
|
||||||
|
walClose(pMgmt->pWal);
|
||||||
|
pMgmt->pWal = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndRestoreWal(SMnode *pMnode) {
|
||||||
|
SWal *pWal = pMnode->syncMgmt.pWal;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
int64_t lastSdbVer = sdbUpdateVer(pSdb, 0);
|
||||||
|
int32_t code = -1;
|
||||||
|
|
||||||
|
SWalReadHandle *pHandle = walOpenReadHandle(pWal);
|
||||||
|
if (pHandle == NULL) return -1;
|
||||||
|
|
||||||
|
int64_t first = walGetFirstVer(pWal);
|
||||||
|
int64_t last = walGetLastVer(pWal);
|
||||||
|
mDebug("restore sdb wal start, sdb ver:%" PRId64 ", wal first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last);
|
||||||
|
|
||||||
|
first = MAX(lastSdbVer + 1, first);
|
||||||
|
for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) {
|
||||||
|
if (walReadWithHandle(pHandle, ver) < 0) {
|
||||||
|
mError("failed to read by wal handle since %s, ver:%" PRId64, terrstr(), ver);
|
||||||
|
goto WAL_RESTORE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
SWalHead *pHead = pHandle->pHead;
|
||||||
|
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
|
||||||
|
if (sdbVer + 1 != ver) {
|
||||||
|
terrno = TSDB_CODE_SDB_INVALID_WAl_VER;
|
||||||
|
mError("failed to read wal from sdb, sdbVer:%" PRId64 " inconsistent with ver:%" PRId64, sdbVer, ver);
|
||||||
|
goto WAL_RESTORE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sdbWriteNotFree(pSdb, (void *)pHead->head.body) < 0) {
|
||||||
|
mError("failed to read wal from sdb since %s, ver:%" PRId64, terrstr(), ver);
|
||||||
|
goto WAL_RESTORE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbUpdateVer(pSdb, 1);
|
||||||
|
mDebug("wal:%" PRId64 ", is restored", ver);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
|
||||||
|
mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer);
|
||||||
|
|
||||||
|
if (walBeginSnapshot(pWal, sdbVer) < 0) {
|
||||||
|
goto WAL_RESTORE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sdbVer != lastSdbVer) {
|
||||||
|
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
|
||||||
|
if (sdbWriteFile(pSdb) != 0) {
|
||||||
|
goto WAL_RESTORE_OVER;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (walEndSnapshot(pWal) < 0) {
|
||||||
|
goto WAL_RESTORE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
WAL_RESTORE_OVER:
|
||||||
|
walCloseReadHandle(pHandle);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndInitSync(SMnode *pMnode) {
|
int32_t mndInitSync(SMnode *pMnode) {
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
tsem_init(&pMgmt->syncSem, 0, 0);
|
tsem_init(&pMgmt->syncSem, 0, 0);
|
||||||
|
|
||||||
|
if (mndInitWal(pMnode) < 0) {
|
||||||
|
mError("failed to open wal since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndRestoreWal(pMnode) < 0) {
|
||||||
|
mError("failed to restore wal since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
pMgmt->state = TAOS_SYNC_STATE_LEADER;
|
pMgmt->state = TAOS_SYNC_STATE_LEADER;
|
||||||
pMgmt->pSyncNode = NULL;
|
pMgmt->pSyncNode = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -28,6 +127,7 @@ int32_t mndInitSync(SMnode *pMnode) {
|
||||||
void mndCleanupSync(SMnode *pMnode) {
|
void mndCleanupSync(SMnode *pMnode) {
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
tsem_destroy(&pMgmt->syncSem);
|
tsem_destroy(&pMgmt->syncSem);
|
||||||
|
mndCloseWal(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
|
static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
|
||||||
|
@ -41,6 +141,20 @@ static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSync
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
||||||
|
SWal *pWal = pMnode->syncMgmt.pWal;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
int64_t ver = sdbUpdateVer(pSdb, 1);
|
||||||
|
if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
|
||||||
|
sdbUpdateVer(pSdb, -1);
|
||||||
|
mError("failed to write raw:%p since %s, ver:%" PRId64, pRaw, terrstr(), ver);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mTrace("raw:%p, write to wal, ver:%" PRId64, pRaw, ver);
|
||||||
|
walCommit(pWal, ver);
|
||||||
|
walFsync(pWal, true);
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
return 0;
|
return 0;
|
||||||
#else
|
#else
|
||||||
|
|
|
@ -169,7 +169,7 @@ TRANS_ENCODE_OVER:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos);
|
mTrace("trans:%d, encode to raw:%p, row:%p len:%d", pTrans->id, pRaw, pTrans, dataPos);
|
||||||
return pRaw;
|
return pRaw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,6 +226,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
|
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
|
||||||
pData = malloc(dataLen);
|
pData = malloc(dataLen);
|
||||||
if (pData == NULL) goto TRANS_DECODE_OVER;
|
if (pData == NULL) goto TRANS_DECODE_OVER;
|
||||||
|
mTrace("raw:%p, is created", pData);
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
|
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
|
||||||
if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
|
if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
|
||||||
pData = NULL;
|
pData = NULL;
|
||||||
|
@ -235,6 +236,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
|
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
|
||||||
pData = malloc(dataLen);
|
pData = malloc(dataLen);
|
||||||
if (pData == NULL) goto TRANS_DECODE_OVER;
|
if (pData == NULL) goto TRANS_DECODE_OVER;
|
||||||
|
mTrace("raw:%p, is created", pData);
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
|
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
|
||||||
if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
|
if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
|
||||||
pData = NULL;
|
pData = NULL;
|
||||||
|
@ -243,6 +245,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
for (int32_t i = 0; i < commitLogNum; ++i) {
|
for (int32_t i = 0; i < commitLogNum; ++i) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
|
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
|
||||||
pData = malloc(dataLen);
|
pData = malloc(dataLen);
|
||||||
|
if (pData == NULL) goto TRANS_DECODE_OVER;
|
||||||
|
mTrace("raw:%p, is created", pData);
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
|
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
|
||||||
if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
|
if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
|
||||||
pData = NULL;
|
pData = NULL;
|
||||||
|
@ -284,13 +288,13 @@ TRANS_DECODE_OVER:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("trans:%d, decode from raw:%p, data:%p", pTrans->id, pRaw, pTrans);
|
mTrace("trans:%d, decode from raw:%p, row:%p", pTrans->id, pRaw, pTrans);
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
|
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
|
||||||
pTrans->stage = TRN_STAGE_PREPARE;
|
pTrans->stage = TRN_STAGE_PREPARE;
|
||||||
mTrace("trans:%d, perform insert action, data:%p", pTrans->id, pTrans);
|
mTrace("trans:%d, perform insert action, row:%p", pTrans->id, pTrans);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,13 +307,13 @@ static void mndTransDropData(STrans *pTrans) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
|
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
|
||||||
mTrace("trans:%d, perform delete action, data:%p", pTrans->id, pTrans);
|
mTrace("trans:%d, perform delete action, row:%p", pTrans->id, pTrans);
|
||||||
mndTransDropData(pTrans);
|
mndTransDropData(pTrans);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) {
|
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) {
|
||||||
mTrace("trans:%d, perform update action, data:%p", pOldTrans->id, pOldTrans);
|
mTrace("trans:%d, perform update action, old_row:%p new_row:%p", pOldTrans->id, pOldTrans, pNewTrans);
|
||||||
pOldTrans->stage = pNewTrans->stage;
|
pOldTrans->stage = pNewTrans->stage;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,7 +185,7 @@ static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
|
||||||
|
|
||||||
for (int32_t s = pos; s >= 0; s--) {
|
for (int32_t s = pos; s >= 0; s--) {
|
||||||
SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
|
SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
|
||||||
mDebug("step:%s will cleanup", pStep->name);
|
mDebug("%s will cleanup", pStep->name);
|
||||||
if (pStep->cleanupFp != NULL) {
|
if (pStep->cleanupFp != NULL) {
|
||||||
(*pStep->cleanupFp)(pMnode);
|
(*pStep->cleanupFp)(pMnode);
|
||||||
}
|
}
|
||||||
|
@ -204,12 +204,12 @@ static int32_t mndExecSteps(SMnode *pMnode) {
|
||||||
|
|
||||||
if ((*pStep->initFp)(pMnode) != 0) {
|
if ((*pStep->initFp)(pMnode) != 0) {
|
||||||
int32_t code = terrno;
|
int32_t code = terrno;
|
||||||
mError("step:%s exec failed since %s, start to cleanup", pStep->name, terrstr());
|
mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
|
||||||
mndCleanupSteps(pMnode, pos);
|
mndCleanupSteps(pMnode, pos);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
mDebug("step:%s is initialized", pStep->name);
|
mDebug("%s is initialized", pStep->name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -357,7 +357,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
|
||||||
SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
|
SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr());
|
mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -365,7 +365,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
|
||||||
if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
|
if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||||
mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr());
|
mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
|
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
|
||||||
|
@ -374,12 +374,12 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
|
||||||
pMsg->rpcMsg = *pRpcMsg;
|
pMsg->rpcMsg = *pRpcMsg;
|
||||||
pMsg->createdTime = taosGetTimestampSec();
|
pMsg->createdTime = taosGetTimestampSec();
|
||||||
|
|
||||||
mTrace("msg:%p, app:%p is created, RPC:%p", pMsg, pRpcMsg->ahandle, pRpcMsg->handle);
|
mTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpcMsg->ahandle, pRpcMsg->handle, pMsg->user);
|
||||||
return pMsg;
|
return pMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndCleanupMsg(SMnodeMsg *pMsg) {
|
void mndCleanupMsg(SMnodeMsg *pMsg) {
|
||||||
mTrace("msg:%p, app:%p is destroyed, RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle);
|
mTrace("msg:%p, is destroyed, app:%p RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle);
|
||||||
rpcFreeCont(pMsg->rpcMsg.pCont);
|
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||||
pMsg->rpcMsg.pCont = NULL;
|
pMsg->rpcMsg.pCont = NULL;
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
|
@ -397,37 +397,37 @@ void mndProcessMsg(SMnodeMsg *pMsg) {
|
||||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||||
bool isReq = (msgType & 1U);
|
bool isReq = (msgType & 1U);
|
||||||
|
|
||||||
mTrace("msg:%p, app:%p type:%s will be processed", pMsg, ahandle, TMSG_INFO(msgType));
|
mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle);
|
||||||
|
|
||||||
if (isReq && !mndIsMaster(pMnode)) {
|
if (isReq && !mndIsMaster(pMnode)) {
|
||||||
code = TSDB_CODE_APP_NOT_READY;
|
code = TSDB_CODE_APP_NOT_READY;
|
||||||
mDebug("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
|
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
||||||
goto PROCESS_RPC_END;
|
goto PROCESS_RPC_END;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isReq && pMsg->rpcMsg.pCont == NULL) {
|
if (isReq && pMsg->rpcMsg.pCont == NULL) {
|
||||||
code = TSDB_CODE_MND_INVALID_MSG_LEN;
|
code = TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||||
mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
|
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
||||||
goto PROCESS_RPC_END;
|
goto PROCESS_RPC_END;
|
||||||
}
|
}
|
||||||
|
|
||||||
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)];
|
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)];
|
||||||
if (fp == NULL) {
|
if (fp == NULL) {
|
||||||
code = TSDB_CODE_MSG_NOT_PROCESSED;
|
code = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
mError("msg:%p, app:%p failed to process since no handle", pMsg, ahandle);
|
mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle);
|
||||||
goto PROCESS_RPC_END;
|
goto PROCESS_RPC_END;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = (*fp)(pMsg);
|
code = (*fp)(pMsg);
|
||||||
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||||
mTrace("msg:%p, app:%p in progressing", pMsg, ahandle);
|
mTrace("msg:%p, in progress, app:%p", pMsg, ahandle);
|
||||||
return;
|
return;
|
||||||
} else if (code != 0) {
|
} else if (code != 0) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
|
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
||||||
goto PROCESS_RPC_END;
|
goto PROCESS_RPC_END;
|
||||||
} else {
|
} else {
|
||||||
mTrace("msg:%p, app:%p is processed", pMsg, ahandle);
|
mTrace("msg:%p, is processed, app:%p", pMsg, ahandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
PROCESS_RPC_END:
|
PROCESS_RPC_END:
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
enable_testing()
|
||||||
|
|
||||||
|
add_subdirectory(acct)
|
||||||
|
add_subdirectory(user)
|
|
@ -0,0 +1,11 @@
|
||||||
|
aux_source_directory(. ACCT_SRC)
|
||||||
|
add_executable(mnode_test_acct ${ACCT_SRC})
|
||||||
|
target_link_libraries(
|
||||||
|
mnode_test_acct
|
||||||
|
PUBLIC sut
|
||||||
|
)
|
||||||
|
|
||||||
|
add_test(
|
||||||
|
NAME mnode_test_acct
|
||||||
|
COMMAND mnode_test_acct
|
||||||
|
)
|
|
@ -1,7 +1,7 @@
|
||||||
/**
|
/**
|
||||||
* @file acct.cpp
|
* @file acct.cpp
|
||||||
* @author slguan (slguan@taosdata.com)
|
* @author slguan (slguan@taosdata.com)
|
||||||
* @brief DNODE module acct-msg tests
|
* @brief MNODE module acct-msg tests
|
||||||
* @version 0.1
|
* @version 0.1
|
||||||
* @date 2021-12-15
|
* @date 2021-12-15
|
||||||
*
|
*
|
||||||
|
@ -13,7 +13,7 @@
|
||||||
|
|
||||||
class DndTestAcct : public ::testing::Test {
|
class DndTestAcct : public ::testing::Test {
|
||||||
protected:
|
protected:
|
||||||
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_acct", 9012); }
|
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_acct", 9012); }
|
||||||
static void TearDownTestSuite() { test.Cleanup(); }
|
static void TearDownTestSuite() { test.Cleanup(); }
|
||||||
|
|
||||||
static Testbase test;
|
static Testbase test;
|
|
@ -0,0 +1,11 @@
|
||||||
|
aux_source_directory(. USER_SRC)
|
||||||
|
add_executable(mnode_test_user ${USER_SRC})
|
||||||
|
target_link_libraries(
|
||||||
|
mnode_test_user
|
||||||
|
PUBLIC sut
|
||||||
|
)
|
||||||
|
|
||||||
|
add_test(
|
||||||
|
NAME mnode_test_user
|
||||||
|
COMMAND mnode_test_user
|
||||||
|
)
|
|
@ -1,7 +1,7 @@
|
||||||
/**
|
/**
|
||||||
* @file user.cpp
|
* @file user.cpp
|
||||||
* @author slguan (slguan@taosdata.com)
|
* @author slguan (slguan@taosdata.com)
|
||||||
* @brief DNODE module user-msg tests
|
* @brief MNODE module user-msg tests
|
||||||
* @version 0.1
|
* @version 0.1
|
||||||
* @date 2021-12-15
|
* @date 2021-12-15
|
||||||
*
|
*
|
||||||
|
@ -13,7 +13,7 @@
|
||||||
|
|
||||||
class DndTestUser : public ::testing::Test {
|
class DndTestUser : public ::testing::Test {
|
||||||
protected:
|
protected:
|
||||||
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_user", 9140); }
|
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_user", 9140); }
|
||||||
static void TearDownTestSuite() { test.Cleanup(); }
|
static void TearDownTestSuite() { test.Cleanup(); }
|
||||||
|
|
||||||
static Testbase test;
|
static Testbase test;
|
|
@ -17,11 +17,12 @@
|
||||||
#define _TD_SDB_INT_H_
|
#define _TD_SDB_INT_H_
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
#include "sdb.h"
|
#include "sdb.h"
|
||||||
#include "tmsg.h"
|
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "tlockfree.h"
|
#include "tlockfree.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -59,7 +60,8 @@ typedef struct SSdb {
|
||||||
char *tmpDir;
|
char *tmpDir;
|
||||||
int64_t lastCommitVer;
|
int64_t lastCommitVer;
|
||||||
int64_t curVer;
|
int64_t curVer;
|
||||||
int32_t maxId[SDB_MAX];
|
int64_t tableVer[SDB_MAX];
|
||||||
|
int64_t maxId[SDB_MAX];
|
||||||
EKeyType keyTypes[SDB_MAX];
|
EKeyType keyTypes[SDB_MAX];
|
||||||
SHashObj *hashObjs[SDB_MAX];
|
SHashObj *hashObjs[SDB_MAX];
|
||||||
SRWLatch locks[SDB_MAX];
|
SRWLatch locks[SDB_MAX];
|
||||||
|
@ -71,8 +73,6 @@ typedef struct SSdb {
|
||||||
SdbDecodeFp decodeFps[SDB_MAX];
|
SdbDecodeFp decodeFps[SDB_MAX];
|
||||||
} SSdb;
|
} SSdb;
|
||||||
|
|
||||||
int32_t sdbWriteFile(SSdb *pSdb);
|
|
||||||
|
|
||||||
const char *sdbTableName(ESdbType type);
|
const char *sdbTableName(ESdbType type);
|
||||||
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
|
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
|
||||||
|
|
||||||
|
|
|
@ -49,8 +49,13 @@ SSdb *sdbInit(SSdbOpt *pOption) {
|
||||||
|
|
||||||
for (ESdbType i = 0; i < SDB_MAX; ++i) {
|
for (ESdbType i = 0; i < SDB_MAX; ++i) {
|
||||||
taosInitRWLatch(&pSdb->locks[i]);
|
taosInitRWLatch(&pSdb->locks[i]);
|
||||||
|
pSdb->maxId[i] = 0;
|
||||||
|
pSdb->tableVer[i] = -1;
|
||||||
|
pSdb->keyTypes[i] = SDB_KEY_INT32;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pSdb->curVer = -1;
|
||||||
|
pSdb->lastCommitVer = -1;
|
||||||
pSdb->pMnode = pOption->pMnode;
|
pSdb->pMnode = pOption->pMnode;
|
||||||
mDebug("sdb init successfully");
|
mDebug("sdb init successfully");
|
||||||
return pSdb;
|
return pSdb;
|
||||||
|
@ -59,10 +64,10 @@ SSdb *sdbInit(SSdbOpt *pOption) {
|
||||||
void sdbCleanup(SSdb *pSdb) {
|
void sdbCleanup(SSdb *pSdb) {
|
||||||
mDebug("start to cleanup sdb");
|
mDebug("start to cleanup sdb");
|
||||||
|
|
||||||
// if (pSdb->curVer != pSdb->lastCommitVer) {
|
if (pSdb->curVer != pSdb->lastCommitVer) {
|
||||||
mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer);
|
mDebug("write sdb file for current ver:%" PRId64 " != last commit ver:%" PRId64, pSdb->curVer, pSdb->lastCommitVer);
|
||||||
sdbWriteFile(pSdb);
|
sdbWriteFile(pSdb);
|
||||||
// }
|
}
|
||||||
|
|
||||||
if (pSdb->currDir != NULL) {
|
if (pSdb->currDir != NULL) {
|
||||||
tfree(pSdb->currDir);
|
tfree(pSdb->currDir);
|
||||||
|
@ -133,7 +138,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
|
||||||
pSdb->maxId[sdbType] = 0;
|
pSdb->maxId[sdbType] = 0;
|
||||||
pSdb->hashObjs[sdbType] = hash;
|
pSdb->hashObjs[sdbType] = hash;
|
||||||
taosInitRWLatch(&pSdb->locks[sdbType]);
|
taosInitRWLatch(&pSdb->locks[sdbType]);
|
||||||
mDebug("sdb table:%d is initialized", sdbType);
|
mDebug("sdb table:%s is initialized", sdbTableName(sdbType));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -159,3 +164,8 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) {
|
||||||
|
pSdb->curVer += val;
|
||||||
|
return pSdb->curVer;
|
||||||
|
}
|
|
@ -17,10 +17,13 @@
|
||||||
#include "sdbInt.h"
|
#include "sdbInt.h"
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
|
|
||||||
|
#define SDB_TABLE_SIZE 24
|
||||||
|
#define SDB_RESERVE_SIZE 512
|
||||||
|
|
||||||
static int32_t sdbRunDeployFp(SSdb *pSdb) {
|
static int32_t sdbRunDeployFp(SSdb *pSdb) {
|
||||||
mDebug("start to deploy sdb");
|
mDebug("start to deploy sdb");
|
||||||
|
|
||||||
for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) {
|
for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
|
||||||
SdbDeployFp fp = pSdb->deployFps[i];
|
SdbDeployFp fp = pSdb->deployFps[i];
|
||||||
if (fp == NULL) continue;
|
if (fp == NULL) continue;
|
||||||
|
|
||||||
|
@ -34,6 +37,100 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t sdbReadFileHead(SSdb *pSdb, FileFd fd) {
|
||||||
|
int32_t ret = taosReadFile(fd, &pSdb->curVer, sizeof(int64_t));
|
||||||
|
if (ret < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (ret != sizeof(int64_t)) {
|
||||||
|
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
||||||
|
int64_t maxId = -1;
|
||||||
|
ret = taosReadFile(fd, &maxId, sizeof(int64_t));
|
||||||
|
if (ret < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (ret != sizeof(int64_t)) {
|
||||||
|
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (i < SDB_MAX) {
|
||||||
|
pSdb->maxId[i] = maxId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
||||||
|
int64_t ver = -1;
|
||||||
|
ret = taosReadFile(fd, &ver, sizeof(int64_t));
|
||||||
|
if (ret < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (ret != sizeof(int64_t)) {
|
||||||
|
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (i < SDB_MAX) {
|
||||||
|
pSdb->tableVer[i] = ver;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char reserve[SDB_RESERVE_SIZE] = {0};
|
||||||
|
ret = taosReadFile(fd, reserve, sizeof(reserve));
|
||||||
|
if (ret < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (ret != sizeof(reserve)) {
|
||||||
|
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t sdbWriteFileHead(SSdb *pSdb, FileFd fd) {
|
||||||
|
if (taosWriteFile(fd, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
||||||
|
int64_t maxId = -1;
|
||||||
|
if (i < SDB_MAX) {
|
||||||
|
maxId = pSdb->maxId[i];
|
||||||
|
}
|
||||||
|
if (taosWriteFile(fd, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
||||||
|
int64_t ver = -1;
|
||||||
|
if (i < SDB_MAX) {
|
||||||
|
ver = pSdb->tableVer[i];
|
||||||
|
}
|
||||||
|
if (taosWriteFile(fd, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char reserve[SDB_RESERVE_SIZE] = {0};
|
||||||
|
if (taosWriteFile(fd, reserve, sizeof(reserve)) != sizeof(reserve)) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t sdbReadFile(SSdb *pSdb) {
|
int32_t sdbReadFile(SSdb *pSdb) {
|
||||||
int64_t offset = 0;
|
int64_t offset = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -43,12 +140,13 @@ int32_t sdbReadFile(SSdb *pSdb) {
|
||||||
SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
|
SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
|
||||||
if (pRaw == NULL) {
|
if (pRaw == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mError("failed read file since %s", terrstr());
|
mError("failed read file since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
char file[PATH_MAX] = {0};
|
char file[PATH_MAX] = {0};
|
||||||
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||||
|
mDebug("start to read file:%s", file);
|
||||||
|
|
||||||
FileFd fd = taosOpenFileRead(file);
|
FileFd fd = taosOpenFileRead(file);
|
||||||
if (fd <= 0) {
|
if (fd <= 0) {
|
||||||
|
@ -58,6 +156,14 @@ int32_t sdbReadFile(SSdb *pSdb) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (sdbReadFileHead(pSdb, fd) != 0) {
|
||||||
|
mError("failed to read file:%s head since %s", file, terrstr());
|
||||||
|
pSdb->curVer = -1;
|
||||||
|
free(pRaw);
|
||||||
|
taosCloseFile(fd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
readLen = sizeof(SSdbRaw);
|
readLen = sizeof(SSdbRaw);
|
||||||
ret = taosReadFile(fd, pRaw, readLen);
|
ret = taosReadFile(fd, pRaw, readLen);
|
||||||
|
@ -104,6 +210,8 @@ int32_t sdbReadFile(SSdb *pSdb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
pSdb->lastCommitVer = pSdb->curVer;
|
||||||
|
mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
|
||||||
|
|
||||||
PARSE_SDB_DATA_ERROR:
|
PARSE_SDB_DATA_ERROR:
|
||||||
taosCloseFile(fd);
|
taosCloseFile(fd);
|
||||||
|
@ -130,11 +238,17 @@ int32_t sdbWriteFile(SSdb *pSdb) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) {
|
if (sdbWriteFileHead(pSdb, fd) != 0) {
|
||||||
|
mError("failed to write file:%s head since %s", tmpfile, terrstr());
|
||||||
|
taosCloseFile(fd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
|
||||||
SdbEncodeFp encodeFp = pSdb->encodeFps[i];
|
SdbEncodeFp encodeFp = pSdb->encodeFps[i];
|
||||||
if (encodeFp == NULL) continue;
|
if (encodeFp == NULL) continue;
|
||||||
|
|
||||||
mTrace("sdb write %s, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
|
mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
|
||||||
|
|
||||||
SHashObj *hash = pSdb->hashObjs[i];
|
SHashObj *hash = pSdb->hashObjs[i];
|
||||||
SRWLatch *pLock = &pSdb->locks[i];
|
SRWLatch *pLock = &pSdb->locks[i];
|
||||||
|
@ -155,7 +269,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
|
||||||
pRaw->status = pRow->status;
|
pRaw->status = pRow->status;
|
||||||
int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
|
int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen;
|
||||||
if (taosWriteFile(fd, pRaw, writeLen) != writeLen) {
|
if (taosWriteFile(fd, pRaw, writeLen) != writeLen) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
taosHashCancelIterate(hash, ppRow);
|
taosHashCancelIterate(hash, ppRow);
|
||||||
sdbFreeRaw(pRaw);
|
sdbFreeRaw(pRaw);
|
||||||
break;
|
break;
|
||||||
|
@ -163,7 +277,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
|
||||||
|
|
||||||
int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
|
int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
|
||||||
if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
|
if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
|
||||||
code = TAOS_SYSTEM_ERROR(terrno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
taosHashCancelIterate(hash, ppRow);
|
taosHashCancelIterate(hash, ppRow);
|
||||||
sdbFreeRaw(pRaw);
|
sdbFreeRaw(pRaw);
|
||||||
break;
|
break;
|
||||||
|
@ -201,7 +315,8 @@ int32_t sdbWriteFile(SSdb *pSdb) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("failed to write file:%s since %s", curfile, tstrerror(code));
|
mError("failed to write file:%s since %s", curfile, tstrerror(code));
|
||||||
} else {
|
} else {
|
||||||
mDebug("write file:%s successfully", curfile);
|
pSdb->lastCommitVer = pSdb->curVer;
|
||||||
|
mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
|
|
@ -38,6 +38,10 @@ const char *sdbTableName(ESdbType type) {
|
||||||
return "auth";
|
return "auth";
|
||||||
case SDB_ACCT:
|
case SDB_ACCT:
|
||||||
return "acct";
|
return "acct";
|
||||||
|
case SDB_CONSUMER:
|
||||||
|
return "consumer";
|
||||||
|
case SDB_CGROUP:
|
||||||
|
return "cgroup";
|
||||||
case SDB_TOPIC:
|
case SDB_TOPIC:
|
||||||
return "topic";
|
return "topic";
|
||||||
case SDB_VGROUP:
|
case SDB_VGROUP:
|
||||||
|
@ -53,24 +57,41 @@ const char *sdbTableName(ESdbType type) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const char *sdbStatusStr(ESdbStatus status) {
|
||||||
|
switch (status) {
|
||||||
|
case SDB_STATUS_CREATING:
|
||||||
|
return "creating";
|
||||||
|
case SDB_STATUS_UPDATING:
|
||||||
|
return "updating";
|
||||||
|
case SDB_STATUS_DROPPING:
|
||||||
|
return "dropping";
|
||||||
|
case SDB_STATUS_READY:
|
||||||
|
return "ready";
|
||||||
|
case SDB_STATUS_DROPPED:
|
||||||
|
return "dropped";
|
||||||
|
default:
|
||||||
|
return "undefine";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
|
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
|
||||||
EKeyType keyType = pSdb->keyTypes[pRow->type];
|
EKeyType keyType = pSdb->keyTypes[pRow->type];
|
||||||
|
|
||||||
if (keyType == SDB_KEY_BINARY) {
|
if (keyType == SDB_KEY_BINARY) {
|
||||||
mTrace("%s:%s, refCount:%d oper:%s row:%p", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount, oper,
|
mTrace("%s:%s, refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount,
|
||||||
pRow->pObj);
|
oper, pRow->pObj, sdbStatusStr(pRow->status));
|
||||||
} else if (keyType == SDB_KEY_INT32) {
|
} else if (keyType == SDB_KEY_INT32) {
|
||||||
mTrace("%s:%d, refCount:%d oper:%s row:%p", sdbTableName(pRow->type), *(int32_t *)pRow->pObj, pRow->refCount, oper,
|
mTrace("%s:%d, refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj,
|
||||||
pRow->pObj);
|
pRow->refCount, oper, pRow->pObj, sdbStatusStr(pRow->status));
|
||||||
} else if (keyType == SDB_KEY_INT64) {
|
} else if (keyType == SDB_KEY_INT64) {
|
||||||
mTrace("%s:%" PRId64 ", refCount:%d oper:%s row:%p", sdbTableName(pRow->type), *(int64_t *)pRow->pObj,
|
mTrace("%s:%" PRId64 ", refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int64_t *)pRow->pObj,
|
||||||
pRow->refCount, oper, pRow->pObj);
|
pRow->refCount, oper, pRow->pObj, sdbStatusStr(pRow->status));
|
||||||
} else {
|
} else {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
|
static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
|
||||||
if (type >= SDB_MAX || type <= SDB_START) {
|
if (type >= SDB_MAX || type < 0) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
|
terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -100,8 +121,6 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, void *pKey) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
|
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
SRWLatch *pLock = &pSdb->locks[pRow->type];
|
SRWLatch *pLock = &pSdb->locks[pRow->type];
|
||||||
taosWLockLatch(pLock);
|
taosWLockLatch(pLock);
|
||||||
|
|
||||||
|
@ -126,10 +145,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
|
||||||
|
|
||||||
taosWUnLockLatch(pLock);
|
taosWUnLockLatch(pLock);
|
||||||
|
|
||||||
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) {
|
int32_t code = 0;
|
||||||
pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
|
|
||||||
}
|
|
||||||
|
|
||||||
SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
|
SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
|
||||||
if (insertFp != NULL) {
|
if (insertFp != NULL) {
|
||||||
code = (*insertFp)(pSdb, pRow->pObj);
|
code = (*insertFp)(pSdb, pRow->pObj);
|
||||||
|
@ -143,12 +159,18 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) {
|
||||||
|
pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
|
||||||
|
}
|
||||||
|
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT64) {
|
||||||
|
pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
|
||||||
|
}
|
||||||
|
pSdb->tableVer[pRow->type]++;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
|
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
SRWLatch *pLock = &pSdb->locks[pNewRow->type];
|
SRWLatch *pLock = &pSdb->locks[pNewRow->type];
|
||||||
taosRLockLatch(pLock);
|
taosRLockLatch(pLock);
|
||||||
|
|
||||||
|
@ -157,23 +179,25 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
|
||||||
taosRUnLockLatch(pLock);
|
taosRUnLockLatch(pLock);
|
||||||
return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
|
return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
|
||||||
}
|
}
|
||||||
SSdbRow *pOldRow = *ppOldRow;
|
|
||||||
|
|
||||||
|
SSdbRow *pOldRow = *ppOldRow;
|
||||||
pOldRow->status = pRaw->status;
|
pOldRow->status = pRaw->status;
|
||||||
|
sdbPrintOper(pSdb, pOldRow, "updateRow");
|
||||||
taosRUnLockLatch(pLock);
|
taosRUnLockLatch(pLock);
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type];
|
SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type];
|
||||||
if (updateFp != NULL) {
|
if (updateFp != NULL) {
|
||||||
code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
|
code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbFreeRow(pSdb, pNewRow);
|
sdbFreeRow(pSdb, pNewRow);
|
||||||
|
|
||||||
|
pSdb->tableVer[pOldRow->type]++;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
|
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
SRWLatch *pLock = &pSdb->locks[pRow->type];
|
SRWLatch *pLock = &pSdb->locks[pRow->type];
|
||||||
taosWLockLatch(pLock);
|
taosWLockLatch(pLock);
|
||||||
|
|
||||||
|
@ -187,12 +211,15 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
|
||||||
SSdbRow *pOldRow = *ppOldRow;
|
SSdbRow *pOldRow = *ppOldRow;
|
||||||
|
|
||||||
pOldRow->status = pRaw->status;
|
pOldRow->status = pRaw->status;
|
||||||
|
sdbPrintOper(pSdb, pOldRow, "deleteRow");
|
||||||
|
|
||||||
taosHashRemove(hash, pOldRow->pObj, keySize);
|
taosHashRemove(hash, pOldRow->pObj, keySize);
|
||||||
taosWUnLockLatch(pLock);
|
taosWUnLockLatch(pLock);
|
||||||
|
|
||||||
// sdbRelease(pSdb, pOldRow->pObj);
|
pSdb->tableVer[pOldRow->type]++;
|
||||||
sdbFreeRow(pSdb, pRow);
|
sdbFreeRow(pSdb, pRow);
|
||||||
return code;
|
// sdbRelease(pSdb, pOldRow->pObj);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) {
|
int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) {
|
||||||
|
@ -277,7 +304,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
|
||||||
if (pObj == NULL) return;
|
if (pObj == NULL) return;
|
||||||
|
|
||||||
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
|
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
|
||||||
if (pRow->type >= SDB_MAX || pRow->type <= SDB_START) return;
|
if (pRow->type >= SDB_MAX ) return;
|
||||||
|
|
||||||
SRWLatch *pLock = &pSdb->locks[pRow->type];
|
SRWLatch *pLock = &pSdb->locks[pRow->type];
|
||||||
taosRLockLatch(pLock);
|
taosRLockLatch(pLock);
|
||||||
|
|
|
@ -675,28 +675,26 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const
|
||||||
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta);
|
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName) {
|
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName) {
|
||||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) {
|
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgroupInfo vgroupInfo = {0};
|
SVgroupInfo vgroupInfo = {0};
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo));
|
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
|
||||||
|
|
||||||
STableMetaOutput output = {0};
|
STableMetaOutput output = {0};
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo, &output));
|
CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output));
|
||||||
|
|
||||||
//CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
|
//CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
|
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
tfree(output.tbMeta);
|
tfree(output.tbMeta);
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ extern "C" {
|
||||||
|
|
||||||
#include "parser.h"
|
#include "parser.h"
|
||||||
|
|
||||||
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo);
|
int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ
|
||||||
*/
|
*/
|
||||||
SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
|
SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
|
||||||
|
|
||||||
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
|
SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.
|
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.
|
||||||
|
|
|
@ -501,7 +501,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
|
||||||
struct SVCreateTbReq req = {0};
|
struct SVCreateTbReq req = {0};
|
||||||
req.type = TD_CHILD_TABLE;
|
req.type = TD_CHILD_TABLE;
|
||||||
req.name = strdup(tNameGetTableName(&tableName));
|
req.name = strdup(tNameGetTableName(&tableName));
|
||||||
req.ctbCfg.suid = pSuperTableMeta->suid;
|
req.ctbCfg.suid = pSuperTableMeta->uid;
|
||||||
req.ctbCfg.pTag = row;
|
req.ctbCfg.pTag = row;
|
||||||
|
|
||||||
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId));
|
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId));
|
||||||
|
@ -548,11 +548,12 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
|
||||||
taosArrayPush(pBufArray, &pVgData);
|
taosArrayPush(pBufArray, &pVgData);
|
||||||
} while (true);
|
} while (true);
|
||||||
|
|
||||||
SInsertStmtInfo* pStmtInfo = calloc(1, sizeof(SInsertStmtInfo));
|
SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo));
|
||||||
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
|
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
|
||||||
pStmtInfo->pDataBlocks = pBufArray;
|
pStmtInfo->pDataBlocks = pBufArray;
|
||||||
*pOutput = pStmtInfo;
|
|
||||||
*len = sizeof(SInsertStmtInfo);
|
*pOutput = (char*) pStmtInfo;
|
||||||
|
*len = sizeof(SVnodeModifOpStmtInfo);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -823,14 +824,14 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
|
SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
|
||||||
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
|
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
|
||||||
assert(pCreateTable->type == TSQL_CREATE_CTABLE);
|
assert(pCreateTable->type == TSQL_CREATE_CTABLE);
|
||||||
|
|
||||||
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
|
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
|
||||||
SMsgBuf* pMsgBuf = &m;
|
SMsgBuf* pMsgBuf = &m;
|
||||||
|
|
||||||
SInsertStmtInfo* pInsertStmt = NULL;
|
SVnodeModifOpStmtInfo* pInsertStmt = NULL;
|
||||||
|
|
||||||
int32_t msgLen = 0;
|
int32_t msgLen = 0;
|
||||||
int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen);
|
int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen);
|
||||||
|
|
|
@ -61,7 +61,7 @@ typedef struct SInsertParseContext {
|
||||||
SArray* pTableDataBlocks; // global
|
SArray* pTableDataBlocks; // global
|
||||||
SArray* pVgDataBlocks; // global
|
SArray* pVgDataBlocks; // global
|
||||||
int32_t totalNum;
|
int32_t totalNum;
|
||||||
SInsertStmtInfo* pOutput;
|
SVnodeModifOpStmtInfo* pOutput;
|
||||||
} SInsertParseContext;
|
} SInsertParseContext;
|
||||||
|
|
||||||
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
|
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
|
||||||
|
@ -611,7 +611,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
// [(field1_name, ...)]
|
// [(field1_name, ...)]
|
||||||
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
|
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
|
||||||
// [...];
|
// [...];
|
||||||
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
|
int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
|
||||||
SInsertParseContext context = {
|
SInsertParseContext context = {
|
||||||
.pComCxt = pContext,
|
.pComCxt = pContext,
|
||||||
.pSql = (char*) pContext->pSql,
|
.pSql = (char*) pContext->pSql,
|
||||||
|
@ -620,7 +620,7 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
|
||||||
.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),
|
.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),
|
||||||
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
|
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
|
||||||
.totalNum = 0,
|
.totalNum = 0,
|
||||||
.pOutput = calloc(1, sizeof(SInsertStmtInfo))
|
.pOutput = calloc(1, sizeof(SVnodeModifOpStmtInfo))
|
||||||
};
|
};
|
||||||
|
|
||||||
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
|
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
|
||||||
|
|
|
@ -53,7 +53,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (toVnode) {
|
if (toVnode) {
|
||||||
SInsertStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
|
SVnodeModifOpStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
|
||||||
if (pInsertInfo == NULL) {
|
if (pInsertInfo == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -87,7 +87,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
||||||
|
|
||||||
int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
||||||
if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) {
|
if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) {
|
||||||
return parseInsertSql(pCxt, (SInsertStmtInfo**)pQuery);
|
return parseInsertSql(pCxt, (SVnodeModifOpStmtInfo**)pQuery);
|
||||||
} else {
|
} else {
|
||||||
return parseQuerySql(pCxt, pQuery);
|
return parseQuerySql(pCxt, pQuery);
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,7 @@ protected:
|
||||||
return code_;
|
return code_;
|
||||||
}
|
}
|
||||||
|
|
||||||
SInsertStmtInfo* reslut() {
|
SVnodeModifOpStmtInfo* reslut() {
|
||||||
return res_;
|
return res_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,7 +128,7 @@ private:
|
||||||
char sqlBuf_[max_sql_len];
|
char sqlBuf_[max_sql_len];
|
||||||
SParseContext cxt_;
|
SParseContext cxt_;
|
||||||
int32_t code_;
|
int32_t code_;
|
||||||
SInsertStmtInfo* res_;
|
SVnodeModifOpStmtInfo* res_;
|
||||||
};
|
};
|
||||||
|
|
||||||
// INSERT INTO tb_name VALUES (field1_value, ...)
|
// INSERT INTO tb_name VALUES (field1_value, ...)
|
||||||
|
|
|
@ -38,7 +38,7 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
|
static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
|
||||||
SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode;
|
SVnodeModifOpStmtInfo* pInsert = (SVnodeModifOpStmtInfo*)pNode;
|
||||||
|
|
||||||
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
|
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
|
||||||
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
|
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
|
||||||
|
|
|
@ -43,15 +43,12 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
|
||||||
bMsg->header.vgId = htonl(bInput->vgId);
|
bMsg->header.vgId = htonl(bInput->vgId);
|
||||||
|
|
||||||
if (bInput->dbName) {
|
if (bInput->dbName) {
|
||||||
strncpy(bMsg->dbFname, bInput->dbName, sizeof(bMsg->dbFname));
|
tstrncpy(bMsg->dbFname, bInput->dbName, tListLen(bMsg->dbFname));
|
||||||
bMsg->dbFname[sizeof(bMsg->dbFname) - 1] = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname));
|
tstrncpy(bMsg->tableFname, bInput->tableFullName, tListLen(bMsg->tableFname));
|
||||||
bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;
|
|
||||||
|
|
||||||
*msgLen = (int32_t)sizeof(*bMsg);
|
*msgLen = (int32_t)sizeof(*bMsg);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -211,7 +208,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
|
||||||
|
|
||||||
pTableMeta->vgId = isSuperTable ? 0 : msg->vgId;
|
pTableMeta->vgId = isSuperTable ? 0 : msg->vgId;
|
||||||
pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType;
|
pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType;
|
||||||
pTableMeta->uid = msg->suid;
|
pTableMeta->uid = msg->tuid;
|
||||||
pTableMeta->suid = msg->suid;
|
pTableMeta->suid = msg->suid;
|
||||||
pTableMeta->sversion = msg->sversion;
|
pTableMeta->sversion = msg->sversion;
|
||||||
pTableMeta->tversion = msg->tversion;
|
pTableMeta->tversion = msg->tversion;
|
||||||
|
@ -272,7 +269,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
|
||||||
memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname));
|
memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname));
|
||||||
}
|
}
|
||||||
|
|
||||||
code = queryCreateTableMetaFromMsg(pMetaMsg, false, &pOut->tbMeta);
|
code = queryCreateTableMetaFromMsg(pMetaMsg, (pMetaMsg->tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -1094,13 +1094,16 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
SRpcReqContext *pContext;
|
SRpcReqContext *pContext;
|
||||||
pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
|
pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
|
||||||
|
|
||||||
|
char ipstr[24] = {0};
|
||||||
|
taosIpPort2String(pRecv->ip, pRecv->port, ipstr);
|
||||||
|
|
||||||
if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) {
|
if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) {
|
||||||
tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
|
tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
|
||||||
pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), pRecv->ip, pRecv->port, terrno, pRecv->msgLen,
|
pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen,
|
||||||
pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
|
pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
|
||||||
} else {
|
} else {
|
||||||
tDebug("%s %p %p, %d received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
|
tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
|
||||||
pConn, (void *)pHead->ahandle, pHead->msgType, pRecv->ip, pRecv->port, terrno, pRecv->msgLen,
|
pConn, (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen,
|
||||||
pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
|
pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
|
#include "taoserror.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -19,8 +19,10 @@
|
||||||
SWalReadHandle *walOpenReadHandle(SWal *pWal) {
|
SWalReadHandle *walOpenReadHandle(SWal *pWal) {
|
||||||
SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle));
|
SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle));
|
||||||
if (pRead == NULL) {
|
if (pRead == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRead->pWal = pWal;
|
pRead->pWal = pWal;
|
||||||
pRead->readIdxTfd = -1;
|
pRead->readIdxTfd = -1;
|
||||||
pRead->readLogTfd = -1;
|
pRead->readLogTfd = -1;
|
||||||
|
|
|
@ -148,7 +148,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
|
||||||
|
|
||||||
int32_t walEndSnapshot(SWal *pWal) {
|
int32_t walEndSnapshot(SWal *pWal) {
|
||||||
int64_t ver = pWal->vers.verInSnapshotting;
|
int64_t ver = pWal->vers.verInSnapshotting;
|
||||||
if (ver == -1) return -1;
|
if (ver == -1) return 0;
|
||||||
|
|
||||||
pWal->vers.snapshotVer = ver;
|
pWal->vers.snapshotVer = ver;
|
||||||
int ts = taosGetTimestampSec();
|
int ts = taosGetTimestampSec();
|
||||||
|
|
|
@ -170,6 +170,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_STATUS_TYPE, "Invalid status type")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_VER, "Invalid raw data version")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_VER, "Invalid raw data version")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_LEN, "Invalid raw data len")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_LEN, "Invalid raw data len")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_CONTENT, "Invalid raw data content")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_CONTENT, "Invalid raw data content")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_WAl_VER, "Invalid wal version")
|
||||||
|
|
||||||
// mnode-dnode
|
// mnode-dnode
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, "Dnode already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, "Dnode already exists")
|
||||||
|
|
|
@ -28,13 +28,13 @@
|
||||||
tfree(_n); \
|
tfree(_n); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define FREE_HASH_NODE(_h, _n) \
|
#define FREE_HASH_NODE(_h, _n) \
|
||||||
do { \
|
do { \
|
||||||
if ((_h)->freeFp) { \
|
if ((_h)->freeFp) { \
|
||||||
(_h)->freeFp(GET_HASH_NODE_DATA(_n)); \
|
(_h)->freeFp(GET_HASH_NODE_DATA(_n)); \
|
||||||
} \
|
} \
|
||||||
\
|
\
|
||||||
DO_FREE_HASH_NODE(_n); \
|
DO_FREE_HASH_NODE(_n); \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
static FORCE_INLINE void __wr_lock(void *lock, int32_t type) {
|
static FORCE_INLINE void __wr_lock(void *lock, int32_t type) {
|
||||||
|
@ -55,7 +55,6 @@ static FORCE_INLINE void __rd_unlock(void *lock, int32_t type) {
|
||||||
if (type == HASH_NO_LOCK) {
|
if (type == HASH_NO_LOCK) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRUnLockLatch(lock);
|
taosRUnLockLatch(lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,7 +62,6 @@ static FORCE_INLINE void __wr_unlock(void *lock, int32_t type) {
|
||||||
if (type == HASH_NO_LOCK) {
|
if (type == HASH_NO_LOCK) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(lock);
|
taosWUnLockLatch(lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,12 +223,12 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
|
||||||
|
|
||||||
// need the resize process, write lock applied
|
// need the resize process, write lock applied
|
||||||
if (HASH_NEED_RESIZE(pHashObj)) {
|
if (HASH_NEED_RESIZE(pHashObj)) {
|
||||||
__wr_lock(&pHashObj->lock, pHashObj->type);
|
__wr_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
taosHashTableResize(pHashObj);
|
taosHashTableResize(pHashObj);
|
||||||
__wr_unlock(&pHashObj->lock, pHashObj->type);
|
__wr_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
}
|
}
|
||||||
|
|
||||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
|
|
||||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||||
SHashEntry *pe = pHashObj->hashList[slot];
|
SHashEntry *pe = pHashObj->hashList[slot];
|
||||||
|
@ -272,7 +270,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
|
||||||
}
|
}
|
||||||
|
|
||||||
// enable resize
|
// enable resize
|
||||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
atomic_add_fetch_32(&pHashObj->size, 1);
|
atomic_add_fetch_32(&pHashObj->size, 1);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -289,7 +287,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
|
||||||
}
|
}
|
||||||
|
|
||||||
// enable resize
|
// enable resize
|
||||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
|
|
||||||
return pHashObj->enableUpdate ? 0 : -2;
|
return pHashObj->enableUpdate ? 0 : -2;
|
||||||
}
|
}
|
||||||
|
@ -308,14 +306,14 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo
|
||||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
||||||
|
|
||||||
// only add the read lock to disable the resize process
|
// only add the read lock to disable the resize process
|
||||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
|
|
||||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||||
SHashEntry *pe = pHashObj->hashList[slot];
|
SHashEntry *pe = pHashObj->hashList[slot];
|
||||||
|
|
||||||
// no data, return directly
|
// no data, return directly
|
||||||
if (atomic_load_32(&pe->num) == 0) {
|
if (atomic_load_32(&pe->num) == 0) {
|
||||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -358,7 +356,7 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo
|
||||||
taosRUnLockLatch(&pe->latch);
|
taosRUnLockLatch(&pe->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -370,14 +368,14 @@ void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v
|
||||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
||||||
|
|
||||||
// only add the read lock to disable the resize process
|
// only add the read lock to disable the resize process
|
||||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
|
|
||||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||||
SHashEntry *pe = pHashObj->hashList[slot];
|
SHashEntry *pe = pHashObj->hashList[slot];
|
||||||
|
|
||||||
// no data, return directly
|
// no data, return directly
|
||||||
if (atomic_load_32(&pe->num) == 0) {
|
if (atomic_load_32(&pe->num) == 0) {
|
||||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -415,7 +413,7 @@ void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v
|
||||||
taosRUnLockLatch(&pe->latch);
|
taosRUnLockLatch(&pe->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -436,7 +434,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi
|
||||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
||||||
|
|
||||||
// disable the resize process
|
// disable the resize process
|
||||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
|
|
||||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||||
SHashEntry *pe = pHashObj->hashList[slot];
|
SHashEntry *pe = pHashObj->hashList[slot];
|
||||||
|
@ -450,7 +448,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi
|
||||||
assert(pe->next == NULL);
|
assert(pe->next == NULL);
|
||||||
taosWUnLockLatch(&pe->latch);
|
taosWUnLockLatch(&pe->latch);
|
||||||
|
|
||||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -491,7 +489,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi
|
||||||
taosWUnLockLatch(&pe->latch);
|
taosWUnLockLatch(&pe->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -502,7 +500,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
|
||||||
}
|
}
|
||||||
|
|
||||||
// disable the resize process
|
// disable the resize process
|
||||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
|
|
||||||
int32_t numOfEntries = (int32_t)pHashObj->capacity;
|
int32_t numOfEntries = (int32_t)pHashObj->capacity;
|
||||||
for (int32_t i = 0; i < numOfEntries; ++i) {
|
for (int32_t i = 0; i < numOfEntries; ++i) {
|
||||||
|
@ -566,7 +564,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -577,7 +575,7 @@ void taosHashClear(SHashObj *pHashObj) {
|
||||||
|
|
||||||
SHashNode *pNode, *pNext;
|
SHashNode *pNode, *pNext;
|
||||||
|
|
||||||
__wr_lock(&pHashObj->lock, pHashObj->type);
|
__wr_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||||
SHashEntry *pEntry = pHashObj->hashList[i];
|
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||||
|
@ -601,7 +599,7 @@ void taosHashClear(SHashObj *pHashObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_32(&pHashObj->size, 0);
|
atomic_store_32(&pHashObj->size, 0);
|
||||||
__wr_unlock(&pHashObj->lock, pHashObj->type);
|
__wr_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosHashCleanup(SHashObj *pHashObj) {
|
void taosHashCleanup(SHashObj *pHashObj) {
|
||||||
|
@ -864,7 +862,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
|
||||||
char *data = NULL;
|
char *data = NULL;
|
||||||
|
|
||||||
// only add the read lock to disable the resize process
|
// only add the read lock to disable the resize process
|
||||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
|
|
||||||
SHashNode *pNode = NULL;
|
SHashNode *pNode = NULL;
|
||||||
if (p) {
|
if (p) {
|
||||||
|
@ -911,7 +909,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
return data;
|
return data;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -920,7 +918,7 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) {
|
||||||
if (pHashObj == NULL || p == NULL) return;
|
if (pHashObj == NULL || p == NULL) return;
|
||||||
|
|
||||||
// only add the read lock to disable the resize process
|
// only add the read lock to disable the resize process
|
||||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
|
|
||||||
int slot;
|
int slot;
|
||||||
taosHashReleaseNode(pHashObj, p, &slot);
|
taosHashReleaseNode(pHashObj, p, &slot);
|
||||||
|
@ -930,7 +928,7 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) {
|
||||||
taosWUnLockLatch(&pe->latch);
|
taosWUnLockLatch(&pe->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosHashRelease(SHashObj *pHashObj, void *p) {
|
void taosHashRelease(SHashObj *pHashObj, void *p) {
|
||||||
|
|
|
@ -46,7 +46,6 @@ print =============== create child table
|
||||||
sql create table c1 using st tags(1)
|
sql create table c1 using st tags(1)
|
||||||
sql create table c2 using st tags(2)
|
sql create table c2 using st tags(2)
|
||||||
|
|
||||||
return
|
|
||||||
sql show tables
|
sql show tables
|
||||||
if $rows != 2 then
|
if $rows != 2 then
|
||||||
return -1
|
return -1
|
||||||
|
@ -56,6 +55,8 @@ print $data00 $data01 $data02
|
||||||
print $data10 $data11 $data22
|
print $data10 $data11 $data22
|
||||||
print $data20 $data11 $data22
|
print $data20 $data11 $data22
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
print =============== insert data
|
print =============== insert data
|
||||||
sql insert into c1 values(now+1s, 1)
|
sql insert into c1 values(now+1s, 1)
|
||||||
sql insert into c1 values(now+2s, 2)
|
sql insert into c1 values(now+2s, 2)
|
||||||
|
|
Loading…
Reference in New Issue