Hotfix/sangshuduo/td 4240 taosdemo subscribe morethan 100 (#6155)
* DB/create: fix create database when default quorum > replica
* [TD-4081]<fix>: fix vnode dropping
* test/sim: fix null to 0 in show dnodes;
* [TD-4081]<fix>: [v3] fix vnode closing
* [TD-4081]<fix>: vnode not close if dropped
* Revert "test/sim: fix null to 0 in show dnodes;"
This reverts commit 70213e6526
.
* vnode/write: make last write msg to be written or confirmed
* vnode/drop: ingore invalid vgroup id if already dropped
* vnode/read: use app not ready instead of invalid vgroup id to make crash_gen happy
* vnode/close: wait write 900ms
* [td-4209]<fix>: add the uid check for super table metadata that is already cached in local buffer.
* [td-4209]
* [td-225] refactor a sim script.
* Hotfix/sangshuduo/td 4136 taosdemo records morethan 32767 (#6147)
* [TD-4136]<fix>: taosdemo max records per req < 32767
* [TD-4136]<fix>: taosdemo check insert rows not more than 32767.
check insert rows for progressive.
* fix with answer_yes.
Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
* [TD-4240]<fix>: taosdemo subscribe more than max query sql count.
Co-authored-by: Minglei Jin <mljin@taosdata.com>
Co-authored-by: Minglei Jin <stephenk@gmx.us>
Co-authored-by: Shengliang Guan <slguan@taosdata.com>
Co-authored-by: Haojun Liao <hjliao@taosdata.com>
Co-authored-by: haojun Liao <hjxilinx@users.noreply.github.com>
Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
parent
e4f135b4bd
commit
e8d2162cce
|
@ -307,7 +307,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild);
|
||||||
uint32_t tscGetTableMetaSize(STableMeta* pTableMeta);
|
uint32_t tscGetTableMetaSize(STableMeta* pTableMeta);
|
||||||
CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
|
CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
|
||||||
uint32_t tscGetTableMetaMaxSize();
|
uint32_t tscGetTableMetaMaxSize();
|
||||||
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name, void* buf);
|
int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, void* buf);
|
||||||
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
|
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
|
||||||
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
|
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
|
||||||
|
|
||||||
|
|
|
@ -68,14 +68,16 @@ typedef struct CChildTableMeta {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
STableId id;
|
STableId id;
|
||||||
uint8_t tableType;
|
uint8_t tableType;
|
||||||
char sTableName[TSDB_TABLE_FNAME_LEN]; //super table name, not full name
|
char sTableName[TSDB_TABLE_FNAME_LEN]; // TODO: refactor super table name, not full name
|
||||||
|
uint64_t suid; // super table id
|
||||||
} CChildTableMeta;
|
} CChildTableMeta;
|
||||||
|
|
||||||
typedef struct STableMeta {
|
typedef struct STableMeta {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
STableId id;
|
STableId id;
|
||||||
uint8_t tableType;
|
uint8_t tableType;
|
||||||
char sTableName[TSDB_TABLE_FNAME_LEN];
|
char sTableName[TSDB_TABLE_FNAME_LEN]; // super table name
|
||||||
|
uint64_t suid; // super table id
|
||||||
int16_t sversion;
|
int16_t sversion;
|
||||||
int16_t tversion;
|
int16_t tversion;
|
||||||
STableComInfo tableInfo;
|
STableComInfo tableInfo;
|
||||||
|
|
|
@ -94,6 +94,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) {
|
||||||
|
|
||||||
pTableMeta->tableType = pTableMetaMsg->tableType;
|
pTableMeta->tableType = pTableMetaMsg->tableType;
|
||||||
pTableMeta->vgId = pTableMetaMsg->vgroup.vgId;
|
pTableMeta->vgId = pTableMetaMsg->vgroup.vgId;
|
||||||
|
pTableMeta->suid = pTableMetaMsg->suid;
|
||||||
|
|
||||||
pTableMeta->tableInfo = (STableComInfo) {
|
pTableMeta->tableInfo = (STableComInfo) {
|
||||||
.numOfTags = pTableMetaMsg->numOfTags,
|
.numOfTags = pTableMetaMsg->numOfTags,
|
||||||
|
|
|
@ -1832,8 +1832,8 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
||||||
pMetaMsg->sversion = htons(pMetaMsg->sversion);
|
pMetaMsg->sversion = htons(pMetaMsg->sversion);
|
||||||
pMetaMsg->tversion = htons(pMetaMsg->tversion);
|
pMetaMsg->tversion = htons(pMetaMsg->tversion);
|
||||||
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
|
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
|
||||||
|
|
||||||
pMetaMsg->uid = htobe64(pMetaMsg->uid);
|
pMetaMsg->uid = htobe64(pMetaMsg->uid);
|
||||||
|
pMetaMsg->suid = pMetaMsg->suid;
|
||||||
pMetaMsg->contLen = htons(pMetaMsg->contLen);
|
pMetaMsg->contLen = htons(pMetaMsg->contLen);
|
||||||
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
|
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
|
||||||
|
|
||||||
|
@ -2453,14 +2453,11 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
pTableMetaInfo->pTableMeta = (STableMeta *)tmp;
|
pTableMetaInfo->pTableMeta = (STableMeta *)tmp;
|
||||||
memset(pTableMetaInfo->pTableMeta, 0, size);
|
|
||||||
pTableMetaInfo->tableMetaSize = size;
|
|
||||||
} else {
|
|
||||||
//uint32_t s = tscGetTableMetaSize(pTableMetaInfo->pTableMeta);
|
|
||||||
memset(pTableMetaInfo->pTableMeta, 0, size);
|
|
||||||
pTableMetaInfo->tableMetaSize = size;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
memset(pTableMetaInfo->pTableMeta, 0, size);
|
||||||
|
pTableMetaInfo->tableMetaSize = size;
|
||||||
|
|
||||||
pTableMetaInfo->pTableMeta->tableType = -1;
|
pTableMetaInfo->pTableMeta->tableType = -1;
|
||||||
pTableMetaInfo->pTableMeta->tableInfo.numOfColumns = -1;
|
pTableMetaInfo->pTableMeta->tableInfo.numOfColumns = -1;
|
||||||
|
|
||||||
|
@ -2476,8 +2473,9 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
|
||||||
|
|
||||||
STableMeta* pMeta = pTableMetaInfo->pTableMeta;
|
STableMeta* pMeta = pTableMetaInfo->pTableMeta;
|
||||||
if (pMeta->id.uid > 0) {
|
if (pMeta->id.uid > 0) {
|
||||||
|
// in case of child table, here only get the
|
||||||
if (pMeta->tableType == TSDB_CHILD_TABLE) {
|
if (pMeta->tableType == TSDB_CHILD_TABLE) {
|
||||||
int32_t code = tscCreateTableMetaFromCChildMeta(pTableMetaInfo->pTableMeta, name, buf);
|
int32_t code = tscCreateTableMetaFromSTableMeta(pTableMetaInfo->pTableMeta, name, buf);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return getTableMetaFromMnode(pSql, pTableMetaInfo);
|
return getTableMetaFromMnode(pSql, pTableMetaInfo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3370,22 +3370,25 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta) {
|
||||||
assert(pTableMeta != NULL);
|
assert(pTableMeta != NULL);
|
||||||
|
|
||||||
CChildTableMeta* cMeta = calloc(1, sizeof(CChildTableMeta));
|
CChildTableMeta* cMeta = calloc(1, sizeof(CChildTableMeta));
|
||||||
|
|
||||||
cMeta->tableType = TSDB_CHILD_TABLE;
|
cMeta->tableType = TSDB_CHILD_TABLE;
|
||||||
cMeta->vgId = pTableMeta->vgId;
|
cMeta->vgId = pTableMeta->vgId;
|
||||||
cMeta->id = pTableMeta->id;
|
cMeta->id = pTableMeta->id;
|
||||||
|
cMeta->suid = pTableMeta->suid;
|
||||||
tstrncpy(cMeta->sTableName, pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN);
|
tstrncpy(cMeta->sTableName, pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN);
|
||||||
|
|
||||||
return cMeta;
|
return cMeta;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name, void* buf) {
|
int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, void* buf) {
|
||||||
assert(pChild != NULL && buf != NULL);
|
assert(pChild != NULL && buf != NULL);
|
||||||
|
|
||||||
// uint32_t size = tscGetTableMetaMaxSize();
|
STableMeta* p = buf;
|
||||||
STableMeta* p = buf;//calloc(1, size);
|
|
||||||
|
|
||||||
taosHashGetClone(tscTableMetaInfo, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p, -1);
|
taosHashGetClone(tscTableMetaInfo, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, p, -1);
|
||||||
if (p->id.uid > 0) { // tableMeta exists, build child table meta and return
|
|
||||||
|
// tableMeta exists, build child table meta according to the super table meta
|
||||||
|
// the uid need to be checked in addition to the general name of the super table.
|
||||||
|
if (p->id.uid > 0 && pChild->suid == p->id.uid) {
|
||||||
pChild->sversion = p->sversion;
|
pChild->sversion = p->sversion;
|
||||||
pChild->tversion = p->tversion;
|
pChild->tversion = p->tversion;
|
||||||
|
|
||||||
|
@ -3393,13 +3396,9 @@ int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name, v
|
||||||
int32_t total = pChild->tableInfo.numOfColumns + pChild->tableInfo.numOfTags;
|
int32_t total = pChild->tableInfo.numOfColumns + pChild->tableInfo.numOfTags;
|
||||||
|
|
||||||
memcpy(pChild->schema, p->schema, sizeof(SSchema) *total);
|
memcpy(pChild->schema, p->schema, sizeof(SSchema) *total);
|
||||||
|
|
||||||
// tfree(p);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else { // super table has been removed, current tableMeta is also expired. remove it here
|
} else { // super table has been removed, current tableMeta is also expired. remove it here
|
||||||
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||||
|
|
||||||
// tfree(p);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -526,6 +526,8 @@ static int taosRandom()
|
||||||
|
|
||||||
#endif // ifdef Windows
|
#endif // ifdef Windows
|
||||||
|
|
||||||
|
static void prompt();
|
||||||
|
static void prompt2();
|
||||||
static int createDatabasesAndStables();
|
static int createDatabasesAndStables();
|
||||||
static void createChildTables();
|
static void createChildTables();
|
||||||
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
|
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
|
||||||
|
@ -1062,10 +1064,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
|
||||||
printf("# Print debug info: %d\n", arguments->debug_print);
|
printf("# Print debug info: %d\n", arguments->debug_print);
|
||||||
printf("# Print verbose info: %d\n", arguments->verbose_print);
|
printf("# Print verbose info: %d\n", arguments->verbose_print);
|
||||||
printf("###################################################################\n");
|
printf("###################################################################\n");
|
||||||
if (!arguments->answer_yes) {
|
|
||||||
printf("Press enter key to continue\n\n");
|
prompt();
|
||||||
(void) getchar();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3446,10 +3446,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
||||||
g_args.interlace_rows, g_args.num_of_RPR);
|
g_args.interlace_rows, g_args.num_of_RPR);
|
||||||
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
|
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
|
||||||
g_args.num_of_RPR);
|
g_args.num_of_RPR);
|
||||||
if (!g_args.answer_yes) {
|
prompt2();
|
||||||
printf(" press Enter key to continue or Ctrl-C to stop.");
|
|
||||||
(void)getchar();
|
|
||||||
}
|
|
||||||
g_args.interlace_rows = g_args.num_of_RPR;
|
g_args.interlace_rows = g_args.num_of_RPR;
|
||||||
}
|
}
|
||||||
} else if (!interlaceRows) {
|
} else if (!interlaceRows) {
|
||||||
|
@ -3506,9 +3503,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
||||||
g_args.answer_yes = false;
|
g_args.answer_yes = false;
|
||||||
}
|
}
|
||||||
} else if (!answerPrompt) {
|
} else if (!answerPrompt) {
|
||||||
g_args.answer_yes = false;
|
g_args.answer_yes = true; // default is no, mean answer_yes.
|
||||||
} else {
|
} else {
|
||||||
printf("ERROR: failed to read json, confirm_parameter_prompt not found\n");
|
errorPrint("%s", "failed to read json, confirm_parameter_prompt input mistake\n");
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5395,6 +5392,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
int64_t start_time = pThreadInfo->start_time;
|
int64_t start_time = pThreadInfo->start_time;
|
||||||
|
|
||||||
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
|
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
|
||||||
|
|
||||||
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
|
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
|
||||||
|
|
||||||
for (uint64_t i = 0; i < insertRows;) {
|
for (uint64_t i = 0; i < insertRows;) {
|
||||||
|
@ -6095,6 +6093,21 @@ static void *readMetric(void *sarg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void prompt()
|
||||||
|
{
|
||||||
|
if (!g_args.answer_yes) {
|
||||||
|
printf("Press enter key to continue\n\n");
|
||||||
|
(void)getchar();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void prompt2()
|
||||||
|
{
|
||||||
|
if (!g_args.answer_yes) {
|
||||||
|
printf(" press Enter key to continue or Ctrl-C to stop.");
|
||||||
|
(void)getchar();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int insertTestProcess() {
|
static int insertTestProcess() {
|
||||||
|
|
||||||
|
@ -6115,10 +6128,7 @@ static int insertTestProcess() {
|
||||||
if (g_fpOfInsertResult)
|
if (g_fpOfInsertResult)
|
||||||
printfInsertMetaToFile(g_fpOfInsertResult);
|
printfInsertMetaToFile(g_fpOfInsertResult);
|
||||||
|
|
||||||
if (!g_args.answer_yes) {
|
prompt();
|
||||||
printf("Press enter key to continue\n\n");
|
|
||||||
(void)getchar();
|
|
||||||
}
|
|
||||||
|
|
||||||
init_rand_data();
|
init_rand_data();
|
||||||
|
|
||||||
|
@ -6390,10 +6400,7 @@ static int queryTestProcess() {
|
||||||
&g_queryInfo.superQueryInfo.childTblCount);
|
&g_queryInfo.superQueryInfo.childTblCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!g_args.answer_yes) {
|
prompt();
|
||||||
printf("Press enter key to continue\n\n");
|
|
||||||
(void)getchar();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (g_args.debug_print || g_args.verbose_print) {
|
if (g_args.debug_print || g_args.verbose_print) {
|
||||||
printfQuerySystemInfo(taos);
|
printfQuerySystemInfo(taos);
|
||||||
|
@ -6576,6 +6583,15 @@ static void *superSubscribe(void *sarg) {
|
||||||
if (g_queryInfo.superQueryInfo.sqlCount == 0)
|
if (g_queryInfo.superQueryInfo.sqlCount == 0)
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
|
if (g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
|
||||||
|
errorPrint("The number %"PRId64" of sql count(%"PRIu64") multiple the table number(%"PRId64") of the thread is more than max query sql count: %d\n",
|
||||||
|
g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables,
|
||||||
|
g_queryInfo.superQueryInfo.sqlCount,
|
||||||
|
pThreadInfo->ntables,
|
||||||
|
MAX_QUERY_SQL_COUNT);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
if (pThreadInfo->taos == NULL) {
|
if (pThreadInfo->taos == NULL) {
|
||||||
TAOS * taos = NULL;
|
TAOS * taos = NULL;
|
||||||
taos = taos_connect(g_queryInfo.host,
|
taos = taos_connect(g_queryInfo.host,
|
||||||
|
@ -6746,10 +6762,7 @@ static int subscribeTestProcess() {
|
||||||
printfQueryMeta();
|
printfQueryMeta();
|
||||||
resetAfterAnsiEscape();
|
resetAfterAnsiEscape();
|
||||||
|
|
||||||
if (!g_args.answer_yes) {
|
prompt();
|
||||||
printf("Press enter key to continue\n\n");
|
|
||||||
(void) getchar();
|
|
||||||
}
|
|
||||||
|
|
||||||
TAOS * taos = NULL;
|
TAOS * taos = NULL;
|
||||||
taos = taos_connect(g_queryInfo.host,
|
taos = taos_connect(g_queryInfo.host,
|
||||||
|
|
|
@ -389,7 +389,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
|
||||||
if (pCfg->compression < 0) pCfg->compression = tsCompression;
|
if (pCfg->compression < 0) pCfg->compression = tsCompression;
|
||||||
if (pCfg->walLevel < 0) pCfg->walLevel = tsWAL;
|
if (pCfg->walLevel < 0) pCfg->walLevel = tsWAL;
|
||||||
if (pCfg->replications < 0) pCfg->replications = tsReplications;
|
if (pCfg->replications < 0) pCfg->replications = tsReplications;
|
||||||
if (pCfg->quorum < 0) pCfg->quorum = tsQuorum;
|
if (pCfg->quorum < 0) pCfg->quorum = MIN(tsQuorum, pCfg->replications);
|
||||||
if (pCfg->update < 0) pCfg->update = tsUpdate;
|
if (pCfg->update < 0) pCfg->update = tsUpdate;
|
||||||
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = tsCacheLastRow;
|
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = tsCacheLastRow;
|
||||||
if (pCfg->dbType < 0) pCfg->dbType = 0;
|
if (pCfg->dbType < 0) pCfg->dbType = 0;
|
||||||
|
|
|
@ -120,12 +120,14 @@ int32_t vnodeDrop(int32_t vgId) {
|
||||||
vDebug("vgId:%d, failed to drop, vnode not find", vgId);
|
vDebug("vgId:%d, failed to drop, vnode not find", vgId);
|
||||||
return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
}
|
}
|
||||||
|
if (pVnode->dropped) {
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||||
pVnode->dropped = 1;
|
pVnode->dropped = 1;
|
||||||
|
|
||||||
// remove from hash, so new messages wont be consumed
|
|
||||||
vnodeRemoveFromHash(pVnode);
|
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
vnodeCleanupInMWorker(pVnode);
|
vnodeCleanupInMWorker(pVnode);
|
||||||
|
|
||||||
|
@ -425,6 +427,10 @@ int32_t vnodeOpen(int32_t vgId) {
|
||||||
int32_t vnodeClose(int32_t vgId) {
|
int32_t vnodeClose(int32_t vgId) {
|
||||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
if (pVnode == NULL) return 0;
|
if (pVnode == NULL) return 0;
|
||||||
|
if (pVnode->dropped) {
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode);
|
vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode);
|
||||||
vnodeRemoveFromHash(pVnode);
|
vnodeRemoveFromHash(pVnode);
|
||||||
|
@ -510,6 +516,8 @@ void vnodeCleanUp(SVnodeObj *pVnode) {
|
||||||
|
|
||||||
vnodeSetClosingStatus(pVnode);
|
vnodeSetClosingStatus(pVnode);
|
||||||
|
|
||||||
|
vnodeRemoveFromHash(pVnode);
|
||||||
|
|
||||||
// stop replication module
|
// stop replication module
|
||||||
if (pVnode->sync > 0) {
|
if (pVnode->sync > 0) {
|
||||||
int64_t sync = pVnode->sync;
|
int64_t sync = pVnode->sync;
|
||||||
|
|
|
@ -117,14 +117,17 @@ static SVReadMsg *vnodeBuildVReadMsg(SVnodeObj *pVnode, void *pCont, int32_t con
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) {
|
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) {
|
||||||
|
SVnodeObj *pVnode = vparam;
|
||||||
|
if (pVnode->dropped) {
|
||||||
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
|
}
|
||||||
|
|
||||||
SVReadMsg *pRead = vnodeBuildVReadMsg(vparam, pCont, contLen, qtype, rparam);
|
SVReadMsg *pRead = vnodeBuildVReadMsg(vparam, pCont, contLen, qtype, rparam);
|
||||||
if (pRead == NULL) {
|
if (pRead == NULL) {
|
||||||
assert(terrno != 0);
|
assert(terrno != 0);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVnodeObj *pVnode = vparam;
|
|
||||||
|
|
||||||
int32_t code = vnodeCheckRead(pVnode);
|
int32_t code = vnodeCheckRead(pVnode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosFreeQitem(pRead);
|
taosFreeQitem(pRead);
|
||||||
|
|
|
@ -66,6 +66,9 @@ static bool vnodeSetClosingStatusImp(SVnodeObj* pVnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool vnodeSetClosingStatus(SVnodeObj* pVnode) {
|
bool vnodeSetClosingStatus(SVnodeObj* pVnode) {
|
||||||
|
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
|
||||||
|
return true;
|
||||||
|
|
||||||
while (!vnodeSetClosingStatusImp(pVnode)) {
|
while (!vnodeSetClosingStatusImp(pVnode)) {
|
||||||
taosMsleep(1);
|
taosMsleep(1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,11 @@ void vnodeNotifyRole(int32_t vgId, int8_t role) {
|
||||||
vTrace("vgId:%d, vnode not found while notify role", vgId);
|
vTrace("vgId:%d, vnode not found while notify role", vgId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (pVnode->dropped) {
|
||||||
|
vTrace("vgId:%d, vnode dropped while notify role", vgId);
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]);
|
vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]);
|
||||||
pVnode->role = role;
|
pVnode->role = role;
|
||||||
|
@ -75,6 +80,11 @@ void vnodeCtrlFlow(int32_t vgId, int32_t level) {
|
||||||
vTrace("vgId:%d, vnode not found while flow ctrl", vgId);
|
vTrace("vgId:%d, vnode not found while flow ctrl", vgId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (pVnode->dropped) {
|
||||||
|
vTrace("vgId:%d, vnode dropped while flow ctrl", vgId);
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (pVnode->flowctrlLevel != level) {
|
if (pVnode->flowctrlLevel != level) {
|
||||||
vDebug("vgId:%d, set flowctrl level from %d to %d", pVnode->vgId, pVnode->flowctrlLevel, level);
|
vDebug("vgId:%d, set flowctrl level from %d to %d", pVnode->vgId, pVnode->flowctrlLevel, level);
|
||||||
|
@ -129,6 +139,7 @@ int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rpara
|
||||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
vError("vgId:%d, vnode not found while write to cache", vgId);
|
vError("vgId:%d, vnode not found while write to cache", vgId);
|
||||||
|
vnodeRelease(pVnode);
|
||||||
return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -386,4 +386,6 @@ void vnodeWaitWriteCompleted(SVnodeObj *pVnode) {
|
||||||
vTrace("vgId:%d, queued wmsg num:%d", pVnode->vgId, pVnode->queuedWMsg);
|
vTrace("vgId:%d, queued wmsg num:%d", pVnode->vgId, pVnode->queuedWMsg);
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMsleep(900);
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,15 +27,15 @@ class TDTestCase:
|
||||||
def getBuildPath(self):
|
def getBuildPath(self):
|
||||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
if ("community" in selfPath):
|
if "community" in selfPath:
|
||||||
projPath = selfPath[: selfPath.find("community")]
|
projPath = selfPath[: selfPath.find("community")]
|
||||||
else:
|
else:
|
||||||
projPath = selfPath[: selfPath.find("tests")]
|
projPath = selfPath[: selfPath.find("tests")]
|
||||||
|
|
||||||
for root, dirs, files in os.walk(projPath):
|
for root, dirs, files in os.walk(projPath):
|
||||||
if ("taosd" in files):
|
if "taosd" in files:
|
||||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||||
if ("packaging" not in rootRealPath):
|
if "packaging" not in rootRealPath:
|
||||||
buildPath = root[: len(root) - len("/build/bin")]
|
buildPath = root[: len(root) - len("/build/bin")]
|
||||||
break
|
break
|
||||||
return buildPath
|
return buildPath
|
||||||
|
@ -43,12 +43,12 @@ class TDTestCase:
|
||||||
def run(self):
|
def run(self):
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
buildPath = self.getBuildPath()
|
buildPath = self.getBuildPath()
|
||||||
if (buildPath == ""):
|
if buildPath == "":
|
||||||
tdLog.exit("taosd not found!")
|
tdLog.exit("taosd not found!")
|
||||||
else:
|
else:
|
||||||
tdLog.info("taosd found in %s" % buildPath)
|
tdLog.info("taosd found in %s" % buildPath)
|
||||||
binPath = buildPath + "/build/bin/"
|
binPath = buildPath + "/build/bin/"
|
||||||
os.system("yes | %staosdemo -f tools/insert.json" % binPath)
|
os.system("%staosdemo -f tools/insert.json -y" % binPath)
|
||||||
|
|
||||||
tdSql.execute("use db01")
|
tdSql.execute("use db01")
|
||||||
tdSql.query("select count(*) from stb01")
|
tdSql.query("select count(*) from stb01")
|
||||||
|
|
Loading…
Reference in New Issue