Merge remote-tracking branch 'origin/3.0' into feature/dnode3

This commit is contained in:
Shengliang Guan 2022-01-05 23:14:39 -08:00
commit 11a07e7dc3
3 changed files with 36 additions and 32 deletions

View File

@ -535,9 +535,7 @@ void* doFetchRow(SRequestObj* pRequest) {
int64_t transporterId = 0; int64_t transporterId = 0;
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
destroySendMsgInfo(body);
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
} }

View File

@ -281,18 +281,18 @@ TEST(testCase, use_db_test) {
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, create_table_Test) { //TEST(testCase, create_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); // assert(pConn != NULL);
//
TAOS_RES* pRes = taos_query(pConn, "use abc1"); // TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); // taos_free_result(pRes);
//
pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)"); // pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)");
taos_free_result(pRes); // taos_free_result(pRes);
//
taos_close(pConn); // taos_close(pConn);
} //}
//TEST(testCase, create_ctable_Test) { //TEST(testCase, create_ctable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -505,15 +505,15 @@ TEST(testCase, create_multiple_tables) {
taos_free_result(pRes); taos_free_result(pRes);
for(int32_t i = 0; i < 1000; ++i) { // for(int32_t i = 0; i < 1000; ++i) {
char sql[512] = {0}; // char sql[512] = {0};
snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i); // snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i);
TAOS_RES* pres = taos_query(pConn, sql); // TAOS_RES* pres = taos_query(pConn, sql);
if (taos_errno(pres) != 0) { // if (taos_errno(pres) != 0) {
printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); // printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres));
} // }
taos_free_result(pres); // taos_free_result(pres);
} // }
taos_close(pConn); taos_close(pConn);
} }

View File

@ -352,11 +352,12 @@ _return:
int32_t schProcessOnJobPartialSuccess(SSchJob *job) { int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
bool needFetch = job->userFetch;
if ((!job->attr.needFetch) && job->attr.syncSchedule) { if ((!job->attr.needFetch) && job->attr.syncSchedule) {
tsem_post(&job->rspSem); tsem_post(&job->rspSem);
} }
if (job->userFetch) { if (needFetch) {
SCH_ERR_RET(schFetchFromRemote(job)); SCH_ERR_RET(schFetchFromRemote(job));
} }
@ -426,7 +427,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
} }
job->fetchTask = task; job->fetchTask = task;
SCH_ERR_RET(schProcessOnJobPartialSuccess(job)); SCH_ERR_RET(schProcessOnJobPartialSuccess(job));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -501,7 +501,6 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
if (rspCode != TSDB_CODE_SUCCESS) { if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else { } else {
// job->resNumOfRows += rsp->affectedRows;
code = schProcessOnTaskSuccess(job, task); code = schProcessOnTaskSuccess(job, task);
if (code) { if (code) {
goto _task_error; goto _task_error;
@ -835,8 +834,6 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
// int32_t msgType = (plan->type == QUERY_TYPE_MODIFY)? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType)); SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType));
SCH_ERR_RET(schPushTaskToExecList(job, task)); SCH_ERR_RET(schPushTaskToExecList(job, task));
@ -860,8 +857,13 @@ void schDropJobAllTasks(SSchJob *job) {
void *pIter = taosHashIterate(job->succTasks, NULL); void *pIter = taosHashIterate(job->succTasks, NULL);
while (pIter) { while (pIter) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK);
int32_t msgType = task->plan->msgType;
if (msgType == TDMT_VND_CREATE_TABLE || msgType == TDMT_VND_SUBMIT) {
break;
}
schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK);
pIter = taosHashIterate(job->succTasks, pIter); pIter = taosHashIterate(job->succTasks, pIter);
} }
@ -869,8 +871,12 @@ void schDropJobAllTasks(SSchJob *job) {
while (pIter) { while (pIter) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK); int32_t msgType = task->plan->msgType;
if (msgType == TDMT_VND_CREATE_TABLE || msgType == TDMT_VND_SUBMIT) {
break;
}
schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK);
pIter = taosHashIterate(job->succTasks, pIter); pIter = taosHashIterate(job->succTasks, pIter);
} }
} }