fix create table error while use multi dnodes

This commit is contained in:
Shengliang Guan 2022-01-18 19:50:18 -08:00
parent 8b808fbccc
commit 9a5f4f2f44
2 changed files with 60 additions and 58 deletions

View File

@ -16,9 +16,18 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndAuth.h" #include "mndAuth.h"
int32_t mndInitAuth(SMnode *pMnode) { return 0; } static int32_t mndProcessAuthReq(SMnodeMsg *pReq);
void mndCleanupAuth(SMnode *pMnode) {}
int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t mndInitAuth(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_AUTH, mndProcessAuthReq);
return 0;
}
void mndCleanupAuth(SMnode *pMnode) {}
int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { return 0; }
static int32_t mndProcessAuthReq(SMnodeMsg *pReq) {
mDebug("user:%s, auth req is processed", pReq->user);
return 0; return 0;
} }

View File

@ -46,9 +46,9 @@ typedef struct {
pthread_t thread; pthread_t thread;
} SThreadInfo; } SThreadInfo;
//void parseArgument(int32_t argc, char *argv[]); // void parseArgument(int32_t argc, char *argv[]);
//void *threadFunc(void *param); // void *threadFunc(void *param);
//void createDbAndStb(); // void createDbAndStb();
void createDbAndStb() { void createDbAndStb() {
pPrint("start to create db and stable"); pPrint("start to create db and stable");
@ -64,7 +64,8 @@ void createDbAndStb() {
TAOS_RES *pRes = taos_query(con, qstr); TAOS_RES *pRes = taos_query(con, qstr);
int32_t code = taos_errno(pRes); int32_t code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(pRes), taos_errstr(pRes)); pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(pRes),
taos_errstr(pRes));
exit(0); exit(0);
} }
taos_free_result(pRes); taos_free_result(pRes);
@ -129,10 +130,9 @@ static int64_t getResult(TAOS_RES *tres) {
return numOfRows; return numOfRows;
} }
void showTables() {
void showTables() {
pPrint("start to show tables"); pPrint("start to show tables");
char qstr[32]; char qstr[128];
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (con == NULL) { if (con == NULL) {
@ -140,9 +140,9 @@ void showTables() {
exit(1); exit(1);
} }
sprintf(qstr, "use %s", dbName); snprintf(qstr, 128, "use %s", dbName);
TAOS_RES *pRes = taos_query(con, qstr); TAOS_RES *pRes = taos_query(con, qstr);
int code = taos_errno(pRes); int code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
pError("failed to use db, code:%d reason:%s", taos_errno(pRes), taos_errstr(pRes)); pError("failed to use db, code:%d reason:%s", taos_errno(pRes), taos_errstr(pRes));
exit(1); exit(1);
@ -160,12 +160,11 @@ void showTables() {
int64_t totalTableNum = getResult(pRes); int64_t totalTableNum = getResult(pRes);
taos_free_result(pRes); taos_free_result(pRes);
pPrint("%s database: %s, total %" PRId64 " tables %s", GREEN, dbName, totalTableNum, NC); pPrint("%s database: %s, total %" PRId64 " tables %s", GREEN, dbName, totalTableNum, NC);
taos_close(con); taos_close(con);
} }
void *threadFunc(void *param) { void *threadFunc(void *param) {
SThreadInfo *pInfo = (SThreadInfo *)param; SThreadInfo *pInfo = (SThreadInfo *)param;
char *qstr = malloc(2000 * 1000); char *qstr = malloc(2000 * 1000);
@ -177,48 +176,48 @@ void *threadFunc(void *param) {
exit(1); exit(1);
} }
//printf("thread:%d, table range: %"PRId64 " - %"PRId64 "\n", pInfo->threadIndex, pInfo->tableBeginIndex, pInfo->tableEndIndex); // printf("thread:%d, table range: %"PRId64 " - %"PRId64 "\n", pInfo->threadIndex, pInfo->tableBeginIndex,
// pInfo->tableEndIndex);
sprintf(qstr, "use %s", pInfo->dbName); sprintf(qstr, "use %s", pInfo->dbName);
TAOS_RES *pRes = taos_query(con, qstr); TAOS_RES *pRes = taos_query(con, qstr);
taos_free_result(pRes); taos_free_result(pRes);
if (createTable) { if (createTable) {
int64_t curMs = 0; int64_t curMs = 0;
int64_t beginMs = taosGetTimestampMs(); int64_t beginMs = taosGetTimestampMs();
pInfo->startMs = beginMs; pInfo->startMs = beginMs;
int64_t t = pInfo->tableBeginIndex; int64_t t = pInfo->tableBeginIndex;
for (; t <= pInfo->tableEndIndex;) { for (; t <= pInfo->tableEndIndex;) {
//int64_t batch = (pInfo->tableEndIndex - t); // int64_t batch = (pInfo->tableEndIndex - t);
//batch = MIN(batch, batchNum); // batch = MIN(batch, batchNum);
int32_t len = sprintf(qstr, "create table"); int32_t len = sprintf(qstr, "create table");
for (int32_t i = 0; i < batchNum;) { for (int32_t i = 0; i < batchNum;) {
len += sprintf(qstr + len, " %s_t%" PRId64 " using %s tags(%" PRId64 ")", stbName, t, stbName, t); len += sprintf(qstr + len, " %s_t%" PRId64 " using %s tags(%" PRId64 ")", stbName, t, stbName, t);
t++; t++;
i++; i++;
if (t > pInfo->tableEndIndex) { if (t > pInfo->tableEndIndex) {
break; break;
} }
} }
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
TAOS_RES *pRes = taos_query(con, qstr); TAOS_RES *pRes = taos_query(con, qstr);
code = taos_errno(pRes); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code)); pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code));
} }
taos_free_result(pRes); taos_free_result(pRes);
int64_t endTs = taosGetTimestampUs(); int64_t endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs; int64_t delay = endTs - startTs;
//printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay); // printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay);
if (delay > pInfo->maxDelay) pInfo->maxDelay = delay; if (delay > pInfo->maxDelay) pInfo->maxDelay = delay;
if (delay < pInfo->minDelay) pInfo->minDelay = delay; if (delay < pInfo->minDelay) pInfo->minDelay = delay;
curMs = taosGetTimestampMs(); curMs = taosGetTimestampMs();
if (curMs - beginMs > 10000) { if (curMs - beginMs > 10000) {
beginMs = curMs; beginMs = curMs;
//printf("==== tableBeginIndex: %"PRId64", t: %"PRId64"\n", pInfo->tableBeginIndex, t); // printf("==== tableBeginIndex: %"PRId64", t: %"PRId64"\n", pInfo->tableBeginIndex, t);
printCreateProgress(pInfo, t); printCreateProgress(pInfo, t);
} }
} }
@ -227,7 +226,8 @@ void *threadFunc(void *param) {
if (insertData) { if (insertData) {
int64_t curMs = 0; int64_t curMs = 0;
int64_t beginMs = taosGetTimestampMs();; int64_t beginMs = taosGetTimestampMs();
;
pInfo->startMs = taosGetTimestampMs(); pInfo->startMs = taosGetTimestampMs();
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
@ -247,7 +247,7 @@ void *threadFunc(void *param) {
taos_free_result(pRes); taos_free_result(pRes);
curMs = taosGetTimestampMs(); curMs = taosGetTimestampMs();
if (curMs - beginMs > 10000) { if (curMs - beginMs > 10000) {
printInsertProgress(pInfo, t); printInsertProgress(pInfo, t);
} }
t += (batch - 1); t += (batch - 1);
@ -335,33 +335,32 @@ int32_t main(int32_t argc, char *argv[]) {
parseArgument(argc, argv); parseArgument(argc, argv);
if (showTablesFlag) { if (showTablesFlag) {
showTables(); showTables();
return 0; return 0;
} }
createDbAndStb(); createDbAndStb();
pPrint("%d threads are spawned to create %d tables", numOfThreads, numOfThreads); pPrint("%d threads are spawned to create %" PRId64 " tables", numOfThreads, numOfTables);
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo)); SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo));
//int64_t numOfTablesPerThread = numOfTables / numOfThreads; // int64_t numOfTablesPerThread = numOfTables / numOfThreads;
//numOfTables = numOfTablesPerThread * numOfThreads; // numOfTables = numOfTablesPerThread * numOfThreads;
if (numOfThreads < 1) { if (numOfThreads < 1) {
numOfThreads = 1; numOfThreads = 1;
} }
int64_t a = numOfTables / numOfThreads; int64_t a = numOfTables / numOfThreads;
if (a < 1) { if (a < 1) {
numOfThreads = numOfTables; numOfThreads = numOfTables;
a = 1; a = 1;
} }
int64_t b = 0; int64_t b = 0;
b = numOfTables % numOfThreads; b = numOfTables % numOfThreads;
@ -371,7 +370,7 @@ int32_t main(int32_t argc, char *argv[]) {
pInfo[i].tableEndIndex = i < b ? tableFrom + a : tableFrom + a - 1; pInfo[i].tableEndIndex = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pInfo[i].tableEndIndex + 1; tableFrom = pInfo[i].tableEndIndex + 1;
pInfo[i].threadIndex = i; pInfo[i].threadIndex = i;
pInfo[i].minDelay = INT64_MAX; pInfo[i].minDelay = INT64_MAX;
strcpy(pInfo[i].dbName, dbName); strcpy(pInfo[i].dbName, dbName);
strcpy(pInfo[i].stbName, stbName); strcpy(pInfo[i].stbName, stbName);
pthread_create(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i)); pthread_create(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i));
@ -390,7 +389,7 @@ int32_t main(int32_t argc, char *argv[]) {
createTableSpeed += pInfo[i].createTableSpeed; createTableSpeed += pInfo[i].createTableSpeed;
if (pInfo[i].maxDelay > maxDelay) maxDelay = pInfo[i].maxDelay; if (pInfo[i].maxDelay > maxDelay) maxDelay = pInfo[i].maxDelay;
if (pInfo[i].minDelay < minDelay) minDelay = pInfo[i].minDelay; if (pInfo[i].minDelay < minDelay) minDelay = pInfo[i].minDelay;
} }
float insertDataSpeed = 0; float insertDataSpeed = 0;
@ -398,21 +397,15 @@ int32_t main(int32_t argc, char *argv[]) {
insertDataSpeed += pInfo[i].insertDataSpeed; insertDataSpeed += pInfo[i].insertDataSpeed;
} }
pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64 "us %s", pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64
GREEN, "us %s",
numOfTables, GREEN, numOfTables, createTableSpeed, numOfThreads, maxDelay, minDelay, NC);
createTableSpeed,
numOfThreads,
maxDelay,
minDelay,
NC);
if (insertData) { if (insertData) {
pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed, pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed,
numOfThreads, NC); numOfThreads, NC);
} }
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
free(pInfo); free(pInfo);
} }