Merge pull request #9576 from taosdata/feature/3.0_liaohj
Feature/3.0 liaohj
This commit is contained in:
commit
97fb365252
|
@ -90,12 +90,12 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, cons
|
|||
/**
|
||||
* Force renew a table's local cached meta data.
|
||||
* @param pCatalog (input, got with catalogGetHandle)
|
||||
* @param pRpc (input, rpc object)
|
||||
* @param pTransporter (input, rpc object)
|
||||
* @param pMgmtEps (input, mnode EPs)
|
||||
* @param pTableName (input, table name, NOT including db name)
|
||||
* @return error code
|
||||
*/
|
||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName);
|
||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName);
|
||||
|
||||
/**
|
||||
* Force renew a table's local cached meta data and get the new one.
|
||||
|
|
|
@ -154,14 +154,14 @@ typedef struct SVgDataBlocks {
|
|||
char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
|
||||
} SVgDataBlocks;
|
||||
|
||||
typedef struct SInsertStmtInfo {
|
||||
typedef struct SVnodeModifOpStmtInfo {
|
||||
int16_t nodeType;
|
||||
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
|
||||
int8_t schemaAttache; // denote if submit block is built with table schema or not
|
||||
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
|
||||
uint32_t insertType; // insert data from [file|sql statement| bound statement]
|
||||
const char* sql; // current sql statement position
|
||||
} SInsertStmtInfo;
|
||||
} SVnodeModifOpStmtInfo;
|
||||
|
||||
typedef struct SDclStmtInfo {
|
||||
int16_t nodeType;
|
||||
|
|
|
@ -24,7 +24,7 @@ extern "C" {
|
|||
#include "tlockfree.h"
|
||||
|
||||
typedef uint32_t (*_hash_fn_t)(const char *, uint32_t);
|
||||
typedef int32_t (*_equal_fn_t)(const void*, const void*, uint32_t len);
|
||||
typedef int32_t (*_equal_fn_t)(const void*, const void*, size_t len);
|
||||
typedef void (*_hash_before_fn_t)(void *);
|
||||
typedef void (*_hash_free_fn_t)(void *);
|
||||
|
||||
|
|
|
@ -53,8 +53,8 @@ static void registerRequest(SRequestObj* pRequest) {
|
|||
|
||||
int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1);
|
||||
int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1);
|
||||
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self,
|
||||
pRequest->pTscObj->id, num, currentInst, total);
|
||||
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%"PRIx64, pRequest->self,
|
||||
pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -371,7 +371,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
|
|||
taos_close(pTscObj);
|
||||
pTscObj = NULL;
|
||||
} else {
|
||||
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
|
||||
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pTransporter, pRequest->requestId);
|
||||
destroyRequest(pRequest);
|
||||
}
|
||||
|
||||
|
@ -441,14 +441,13 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
|||
* There is not response callback function for submit response.
|
||||
* The actual inserted number of points is the first number.
|
||||
*/
|
||||
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
|
||||
if (pMsg->code == TSDB_CODE_SUCCESS) {
|
||||
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%" PRId64 " ms", pRequest->requestId,
|
||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen,
|
||||
pRequest->metric.rsp - pRequest->metric.start);
|
||||
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->requestId,
|
||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%" PRId64 " ms", pRequest->requestId,
|
||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen,
|
||||
pRequest->metric.rsp - pRequest->metric.start);
|
||||
tscError("reqId:0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x"PRIx64, pRequest->requestId,
|
||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
|
||||
}
|
||||
|
||||
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
|
||||
|
|
|
@ -57,95 +57,95 @@ TEST(testCase, connect_Test) {
|
|||
taos_close(pConn);
|
||||
}
|
||||
|
||||
//TEST(testCase, create_user_Test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
|
||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
//TEST(testCase, create_account_Test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
|
||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
//TEST(testCase, drop_account_Test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
|
||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
TEST(testCase, create_user_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
//TEST(testCase, show_user_Test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "show users");
|
||||
// TAOS_ROW pRow = NULL;
|
||||
//
|
||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
// int32_t numOfFields = taos_num_fields(pRes);
|
||||
//
|
||||
// char str[512] = {0};
|
||||
// while((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
// printf("%s\n", str);
|
||||
// }
|
||||
//
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
//TEST(testCase, drop_user_Test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "drop user abc");
|
||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
//TEST(testCase, show_db_Test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
//// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "show databases");
|
||||
// TAOS_ROW pRow = NULL;
|
||||
//
|
||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
// int32_t numOfFields = taos_num_fields(pRes);
|
||||
//
|
||||
// char str[512] = {0};
|
||||
// while((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
// printf("%s\n", str);
|
||||
// }
|
||||
//
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
TEST(testCase, create_account_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, drop_account_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, show_user_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "show users");
|
||||
TAOS_ROW pRow = NULL;
|
||||
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
|
||||
char str[512] = {0};
|
||||
while((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
printf("%s\n", str);
|
||||
}
|
||||
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, drop_user_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop user abc");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, show_db_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "show databases");
|
||||
TAOS_ROW pRow = NULL;
|
||||
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
|
||||
char str[512] = {0};
|
||||
while((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
printf("%s\n", str);
|
||||
}
|
||||
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, create_db_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
|
@ -500,22 +500,17 @@ TEST(testCase, create_multiple_tables) {
|
|||
}
|
||||
|
||||
TEST(testCase, generated_request_id_test) {
|
||||
uint64_t id0 = generateRequestId();
|
||||
SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
|
||||
uint64_t id1 = generateRequestId();
|
||||
uint64_t id2 = generateRequestId();
|
||||
uint64_t id3 = generateRequestId();
|
||||
uint64_t id4 = generateRequestId();
|
||||
for(int32_t i = 0; i < 1000000; ++i) {
|
||||
uint64_t v = generateRequestId();
|
||||
void* result = taosHashGet(phash, &v, sizeof(v));
|
||||
ASSERT_EQ(result, nullptr);
|
||||
|
||||
ASSERT_NE(id0, id1);
|
||||
ASSERT_NE(id1, id2);
|
||||
ASSERT_NE(id2, id3);
|
||||
ASSERT_NE(id4, id3);
|
||||
ASSERT_NE(id0, id2);
|
||||
ASSERT_NE(id0, id4);
|
||||
ASSERT_NE(id0, id3);
|
||||
taosHashPut(phash, &v, sizeof(v), NULL, 0);
|
||||
}
|
||||
|
||||
// SHashObj *phash = taosHashInit()
|
||||
taosHashClear(phash);
|
||||
}
|
||||
|
||||
//TEST(testCase, projection_query_tables) {
|
||||
|
|
|
@ -675,28 +675,26 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const
|
|||
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta);
|
||||
}
|
||||
|
||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName) {
|
||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) {
|
||||
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName) {
|
||||
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SVgroupInfo vgroupInfo = {0};
|
||||
int32_t code = 0;
|
||||
|
||||
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo));
|
||||
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
|
||||
|
||||
STableMetaOutput output = {0};
|
||||
|
||||
CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo, &output));
|
||||
CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output));
|
||||
|
||||
//CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
|
||||
|
||||
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
|
||||
|
||||
_return:
|
||||
|
||||
tfree(output.tbMeta);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ extern "C" {
|
|||
|
||||
#include "parser.h"
|
||||
|
||||
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo);
|
||||
int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ
|
|||
*/
|
||||
SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
|
||||
|
||||
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
|
||||
SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
|
||||
|
||||
/**
|
||||
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.
|
||||
|
|
|
@ -501,7 +501,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
|
|||
struct SVCreateTbReq req = {0};
|
||||
req.type = TD_CHILD_TABLE;
|
||||
req.name = strdup(tNameGetTableName(&tableName));
|
||||
req.ctbCfg.suid = pSuperTableMeta->suid;
|
||||
req.ctbCfg.suid = pSuperTableMeta->uid;
|
||||
req.ctbCfg.pTag = row;
|
||||
|
||||
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId));
|
||||
|
@ -548,11 +548,12 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
|
|||
taosArrayPush(pBufArray, &pVgData);
|
||||
} while (true);
|
||||
|
||||
SInsertStmtInfo* pStmtInfo = calloc(1, sizeof(SInsertStmtInfo));
|
||||
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
|
||||
SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo));
|
||||
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
|
||||
pStmtInfo->pDataBlocks = pBufArray;
|
||||
*pOutput = pStmtInfo;
|
||||
*len = sizeof(SInsertStmtInfo);
|
||||
|
||||
*pOutput = (char*) pStmtInfo;
|
||||
*len = sizeof(SVnodeModifOpStmtInfo);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -823,14 +824,14 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
|
||||
SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
|
||||
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
|
||||
assert(pCreateTable->type == TSQL_CREATE_CTABLE);
|
||||
|
||||
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
|
||||
SMsgBuf* pMsgBuf = &m;
|
||||
|
||||
SInsertStmtInfo* pInsertStmt = NULL;
|
||||
SVnodeModifOpStmtInfo* pInsertStmt = NULL;
|
||||
|
||||
int32_t msgLen = 0;
|
||||
int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen);
|
||||
|
|
|
@ -61,7 +61,7 @@ typedef struct SInsertParseContext {
|
|||
SArray* pTableDataBlocks; // global
|
||||
SArray* pVgDataBlocks; // global
|
||||
int32_t totalNum;
|
||||
SInsertStmtInfo* pOutput;
|
||||
SVnodeModifOpStmtInfo* pOutput;
|
||||
} SInsertParseContext;
|
||||
|
||||
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
|
||||
|
@ -611,7 +611,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
// [(field1_name, ...)]
|
||||
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
|
||||
// [...];
|
||||
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
|
||||
int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
|
||||
SInsertParseContext context = {
|
||||
.pComCxt = pContext,
|
||||
.pSql = (char*) pContext->pSql,
|
||||
|
@ -620,7 +620,7 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
|
|||
.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),
|
||||
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
|
||||
.totalNum = 0,
|
||||
.pOutput = calloc(1, sizeof(SInsertStmtInfo))
|
||||
.pOutput = calloc(1, sizeof(SVnodeModifOpStmtInfo))
|
||||
};
|
||||
|
||||
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
|
||||
|
|
|
@ -53,7 +53,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
|||
}
|
||||
|
||||
if (toVnode) {
|
||||
SInsertStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
|
||||
SVnodeModifOpStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
|
||||
if (pInsertInfo == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
@ -87,7 +87,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
|||
|
||||
int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
|
||||
if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) {
|
||||
return parseInsertSql(pCxt, (SInsertStmtInfo**)pQuery);
|
||||
return parseInsertSql(pCxt, (SVnodeModifOpStmtInfo**)pQuery);
|
||||
} else {
|
||||
return parseQuerySql(pCxt, pQuery);
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ protected:
|
|||
return code_;
|
||||
}
|
||||
|
||||
SInsertStmtInfo* reslut() {
|
||||
SVnodeModifOpStmtInfo* reslut() {
|
||||
return res_;
|
||||
}
|
||||
|
||||
|
@ -128,7 +128,7 @@ private:
|
|||
char sqlBuf_[max_sql_len];
|
||||
SParseContext cxt_;
|
||||
int32_t code_;
|
||||
SInsertStmtInfo* res_;
|
||||
SVnodeModifOpStmtInfo* res_;
|
||||
};
|
||||
|
||||
// INSERT INTO tb_name VALUES (field1_value, ...)
|
||||
|
|
|
@ -38,7 +38,7 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
|
|||
}
|
||||
|
||||
static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
|
||||
SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode;
|
||||
SVnodeModifOpStmtInfo* pInsert = (SVnodeModifOpStmtInfo*)pNode;
|
||||
|
||||
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
|
||||
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
|
||||
|
|
|
@ -43,15 +43,12 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
|
|||
bMsg->header.vgId = htonl(bInput->vgId);
|
||||
|
||||
if (bInput->dbName) {
|
||||
strncpy(bMsg->dbFname, bInput->dbName, sizeof(bMsg->dbFname));
|
||||
bMsg->dbFname[sizeof(bMsg->dbFname) - 1] = 0;
|
||||
tstrncpy(bMsg->dbFname, bInput->dbName, tListLen(bMsg->dbFname));
|
||||
}
|
||||
|
||||
strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname));
|
||||
bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;
|
||||
tstrncpy(bMsg->tableFname, bInput->tableFullName, tListLen(bMsg->tableFname));
|
||||
|
||||
*msgLen = (int32_t)sizeof(*bMsg);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -211,7 +208,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
|
|||
|
||||
pTableMeta->vgId = isSuperTable ? 0 : msg->vgId;
|
||||
pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType;
|
||||
pTableMeta->uid = msg->suid;
|
||||
pTableMeta->uid = msg->tuid;
|
||||
pTableMeta->suid = msg->suid;
|
||||
pTableMeta->sversion = msg->sversion;
|
||||
pTableMeta->tversion = msg->tversion;
|
||||
|
@ -272,7 +269,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
|
|||
memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname));
|
||||
}
|
||||
|
||||
code = queryCreateTableMetaFromMsg(pMetaMsg, false, &pOut->tbMeta);
|
||||
code = queryCreateTableMetaFromMsg(pMetaMsg, (pMetaMsg->tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -1094,13 +1094,16 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
|||
SRpcReqContext *pContext;
|
||||
pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
|
||||
|
||||
char ipstr[24] = {0};
|
||||
taosIpPort2String(pRecv->ip, pRecv->port, ipstr);
|
||||
|
||||
if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) {
|
||||
tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
|
||||
pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), pRecv->ip, pRecv->port, terrno, pRecv->msgLen,
|
||||
tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
|
||||
pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen,
|
||||
pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
|
||||
} else {
|
||||
tDebug("%s %p %p, %d received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
|
||||
pConn, (void *)pHead->ahandle, pHead->msgType, pRecv->ip, pRecv->port, terrno, pRecv->msgLen,
|
||||
tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
|
||||
pConn, (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen,
|
||||
pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
|
||||
}
|
||||
|
||||
|
|
|
@ -28,13 +28,13 @@
|
|||
tfree(_n); \
|
||||
} while (0)
|
||||
|
||||
#define FREE_HASH_NODE(_h, _n) \
|
||||
do { \
|
||||
if ((_h)->freeFp) { \
|
||||
#define FREE_HASH_NODE(_h, _n) \
|
||||
do { \
|
||||
if ((_h)->freeFp) { \
|
||||
(_h)->freeFp(GET_HASH_NODE_DATA(_n)); \
|
||||
} \
|
||||
\
|
||||
DO_FREE_HASH_NODE(_n); \
|
||||
} \
|
||||
\
|
||||
DO_FREE_HASH_NODE(_n); \
|
||||
} while (0);
|
||||
|
||||
static FORCE_INLINE void __wr_lock(void *lock, int32_t type) {
|
||||
|
@ -55,7 +55,6 @@ static FORCE_INLINE void __rd_unlock(void *lock, int32_t type) {
|
|||
if (type == HASH_NO_LOCK) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosRUnLockLatch(lock);
|
||||
}
|
||||
|
||||
|
@ -63,7 +62,6 @@ static FORCE_INLINE void __wr_unlock(void *lock, int32_t type) {
|
|||
if (type == HASH_NO_LOCK) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosWUnLockLatch(lock);
|
||||
}
|
||||
|
||||
|
@ -225,12 +223,12 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
|
|||
|
||||
// need the resize process, write lock applied
|
||||
if (HASH_NEED_RESIZE(pHashObj)) {
|
||||
__wr_lock(&pHashObj->lock, pHashObj->type);
|
||||
__wr_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||
taosHashTableResize(pHashObj);
|
||||
__wr_unlock(&pHashObj->lock, pHashObj->type);
|
||||
__wr_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||
}
|
||||
|
||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||
|
||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||
SHashEntry *pe = pHashObj->hashList[slot];
|
||||
|
@ -272,7 +270,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
|
|||
}
|
||||
|
||||
// enable resize
|
||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||
atomic_add_fetch_32(&pHashObj->size, 1);
|
||||
|
||||
return 0;
|
||||
|
@ -289,7 +287,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
|
|||
}
|
||||
|
||||
// enable resize
|
||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||
|
||||
return pHashObj->enableUpdate ? 0 : -2;
|
||||
}
|
||||
|
@ -308,14 +306,14 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo
|
|||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
||||
|
||||
// only add the read lock to disable the resize process
|
||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||
|
||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||
SHashEntry *pe = pHashObj->hashList[slot];
|
||||
|
||||
// no data, return directly
|
||||
if (atomic_load_32(&pe->num) == 0) {
|
||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -358,7 +356,7 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo
|
|||
taosRUnLockLatch(&pe->latch);
|
||||
}
|
||||
|
||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||
return data;
|
||||
}
|
||||
|
||||
|
@ -370,14 +368,14 @@ void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v
|
|||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
||||
|
||||
// only add the read lock to disable the resize process
|
||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||
|
||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||
SHashEntry *pe = pHashObj->hashList[slot];
|
||||
|
||||
// no data, return directly
|
||||
if (atomic_load_32(&pe->num) == 0) {
|
||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -415,7 +413,7 @@ void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v
|
|||
taosRUnLockLatch(&pe->latch);
|
||||
}
|
||||
|
||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||
return data;
|
||||
}
|
||||
|
||||
|
@ -436,7 +434,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi
|
|||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
||||
|
||||
// disable the resize process
|
||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||
|
||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||
SHashEntry *pe = pHashObj->hashList[slot];
|
||||
|
@ -450,7 +448,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi
|
|||
assert(pe->next == NULL);
|
||||
taosWUnLockLatch(&pe->latch);
|
||||
|
||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -491,7 +489,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi
|
|||
taosWUnLockLatch(&pe->latch);
|
||||
}
|
||||
|
||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -502,7 +500,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
|
|||
}
|
||||
|
||||
// disable the resize process
|
||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||
|
||||
int32_t numOfEntries = (int32_t)pHashObj->capacity;
|
||||
for (int32_t i = 0; i < numOfEntries; ++i) {
|
||||
|
@ -566,7 +564,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
|
|||
}
|
||||
}
|
||||
|
||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -577,7 +575,7 @@ void taosHashClear(SHashObj *pHashObj) {
|
|||
|
||||
SHashNode *pNode, *pNext;
|
||||
|
||||
__wr_lock(&pHashObj->lock, pHashObj->type);
|
||||
__wr_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||
|
||||
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||
|
@ -601,7 +599,7 @@ void taosHashClear(SHashObj *pHashObj) {
|
|||
}
|
||||
|
||||
atomic_store_32(&pHashObj->size, 0);
|
||||
__wr_unlock(&pHashObj->lock, pHashObj->type);
|
||||
__wr_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||
}
|
||||
|
||||
void taosHashCleanup(SHashObj *pHashObj) {
|
||||
|
@ -864,7 +862,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
|
|||
char *data = NULL;
|
||||
|
||||
// only add the read lock to disable the resize process
|
||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||
|
||||
SHashNode *pNode = NULL;
|
||||
if (p) {
|
||||
|
@ -911,7 +909,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
|
|||
}
|
||||
}
|
||||
|
||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||
return data;
|
||||
|
||||
}
|
||||
|
@ -920,7 +918,7 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) {
|
|||
if (pHashObj == NULL || p == NULL) return;
|
||||
|
||||
// only add the read lock to disable the resize process
|
||||
__rd_lock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
|
||||
|
||||
int slot;
|
||||
taosHashReleaseNode(pHashObj, p, &slot);
|
||||
|
@ -930,7 +928,7 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) {
|
|||
taosWUnLockLatch(&pe->latch);
|
||||
}
|
||||
|
||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
|
||||
}
|
||||
|
||||
void taosHashRelease(SHashObj *pHashObj, void *p) {
|
||||
|
|
Loading…
Reference in New Issue