TD-1057
This commit is contained in:
parent
a9cabb54fa
commit
41d97eeea7
|
@ -141,7 +141,7 @@ static void syncConnCallback(void *param, TAOS_RES *tres, int code) {
|
||||||
SSqlObj *pSql = (SSqlObj *) tres;
|
SSqlObj *pSql = (SSqlObj *) tres;
|
||||||
assert(pSql != NULL);
|
assert(pSql != NULL);
|
||||||
|
|
||||||
sem_post(&pSql->rspSem);
|
tsem_post(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
|
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
|
||||||
|
@ -156,7 +156,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
|
||||||
pSql->param = pSql;
|
pSql->param = pSql;
|
||||||
|
|
||||||
tscProcessSql(pSql);
|
tscProcessSql(pSql);
|
||||||
sem_wait(&pSql->rspSem);
|
tsem_wait(&pSql->rspSem);
|
||||||
|
|
||||||
if (pSql->res.code != TSDB_CODE_SUCCESS) {
|
if (pSql->res.code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = pSql->res.code;
|
terrno = pSql->res.code;
|
||||||
|
@ -225,12 +225,12 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
|
||||||
assert(tres != NULL);
|
assert(tres != NULL);
|
||||||
|
|
||||||
SSqlObj *pSql = (SSqlObj *) tres;
|
SSqlObj *pSql = (SSqlObj *) tres;
|
||||||
sem_post(&pSql->rspSem);
|
tsem_post(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
|
static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
|
||||||
SSqlObj* pSql = (SSqlObj*) tres;
|
SSqlObj* pSql = (SSqlObj*) tres;
|
||||||
sem_post(&pSql->rspSem);
|
tsem_post(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) {
|
TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) {
|
||||||
|
@ -439,7 +439,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
pCmd->command == TSDB_SQL_CLI_VERSION ||
|
pCmd->command == TSDB_SQL_CLI_VERSION ||
|
||||||
pCmd->command == TSDB_SQL_CURRENT_USER )) {
|
pCmd->command == TSDB_SQL_CURRENT_USER )) {
|
||||||
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
|
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
|
||||||
sem_wait(&pSql->rspSem);
|
tsem_wait(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
return doSetResultRowData(pSql, true);
|
return doSetResultRowData(pSql, true);
|
||||||
|
@ -729,7 +729,7 @@ static void asyncCallback(void *param, TAOS_RES *tres, int code) {
|
||||||
assert(param != NULL);
|
assert(param != NULL);
|
||||||
SSqlObj *pSql = ((SSqlObj *)param);
|
SSqlObj *pSql = ((SSqlObj *)param);
|
||||||
pSql->res.code = code;
|
pSql->res.code = code;
|
||||||
sem_post(&pSql->rspSem);
|
tsem_post(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_validate_sql(TAOS *taos, const char *sql) {
|
int taos_validate_sql(TAOS *taos, const char *sql) {
|
||||||
|
@ -780,7 +780,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
|
||||||
pSql->param = pSql;
|
pSql->param = pSql;
|
||||||
int code = tsParseSql(pSql, true);
|
int code = tsParseSql(pSql, true);
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
sem_wait(&pSql->rspSem);
|
tsem_wait(&pSql->rspSem);
|
||||||
code = pSql->res.code;
|
code = pSql->res.code;
|
||||||
}
|
}
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -33,7 +33,7 @@ typedef struct SSubscriptionProgress {
|
||||||
typedef struct SSub {
|
typedef struct SSub {
|
||||||
void * signature;
|
void * signature;
|
||||||
char topic[32];
|
char topic[32];
|
||||||
sem_t sem;
|
tsem_t sem;
|
||||||
int64_t lastSyncTime;
|
int64_t lastSyncTime;
|
||||||
int64_t lastConsumeTime;
|
int64_t lastConsumeTime;
|
||||||
TAOS * taos;
|
TAOS * taos;
|
||||||
|
@ -85,7 +85,7 @@ static void asyncCallback(void *param, TAOS_RES *tres, int code) {
|
||||||
assert(param != NULL);
|
assert(param != NULL);
|
||||||
SSub *pSub = ((SSub *)param);
|
SSub *pSub = ((SSub *)param);
|
||||||
pSub->pSql->res.code = code;
|
pSub->pSql->res.code = code;
|
||||||
sem_post(&pSub->sem);
|
tsem_post(&pSub->sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -154,7 +154,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
|
||||||
|
|
||||||
code = tsParseSql(pSql, false);
|
code = tsParseSql(pSql, false);
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
sem_wait(&pSub->sem);
|
tsem_wait(&pSub->sem);
|
||||||
code = pSql->res.code;
|
code = pSql->res.code;
|
||||||
}
|
}
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -451,7 +451,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
|
||||||
pSql->fetchFp = asyncCallback;
|
pSql->fetchFp = asyncCallback;
|
||||||
pSql->param = pSub;
|
pSql->param = pSub;
|
||||||
tscDoQuery(pSql);
|
tscDoQuery(pSql);
|
||||||
sem_wait(&pSub->sem);
|
tsem_wait(&pSub->sem);
|
||||||
|
|
||||||
if (pRes->code != TSDB_CODE_SUCCESS) {
|
if (pRes->code != TSDB_CODE_SUCCESS) {
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -2057,7 +2057,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
doBuildResFromSubqueries(pSql);
|
doBuildResFromSubqueries(pSql);
|
||||||
sem_post(&pSql->rspSem);
|
tsem_post(&pSql->rspSem);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
@ -2083,7 +2083,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
|
||||||
// free(pState);
|
// free(pState);
|
||||||
//
|
//
|
||||||
// pRes->completed = true; // set query completed
|
// pRes->completed = true; // set query completed
|
||||||
// sem_post(&pSql->rspSem);
|
// tsem_post(&pSql->rspSem);
|
||||||
// return;
|
// return;
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
|
|
@ -387,7 +387,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
|
||||||
pCmd->allocSize = 0;
|
pCmd->allocSize = 0;
|
||||||
|
|
||||||
taosTFree(pSql->sqlstr);
|
taosTFree(pSql->sqlstr);
|
||||||
sem_destroy(&pSql->rspSem);
|
tsem_destroy(&pSql->rspSem);
|
||||||
free(pSql);
|
free(pSql);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
#include "dnodeMain.h"
|
#include "dnodeMain.h"
|
||||||
|
|
||||||
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context);
|
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context);
|
||||||
static sem_t exitSem;
|
static tsem_t exitSem;
|
||||||
|
|
||||||
int32_t main(int32_t argc, char *argv[]) {
|
int32_t main(int32_t argc, char *argv[]) {
|
||||||
// Set global configuration file
|
// Set global configuration file
|
||||||
|
@ -88,7 +88,7 @@ int32_t main(int32_t argc, char *argv[]) {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sem_init(&exitSem, 0, 0) != 0) {
|
if (tsem_init(&exitSem, 0, 0) != 0) {
|
||||||
printf("failed to create exit semphore\n");
|
printf("failed to create exit semphore\n");
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
@ -117,7 +117,7 @@ int32_t main(int32_t argc, char *argv[]) {
|
||||||
|
|
||||||
syslog(LOG_INFO, "Started TDengine service successfully.");
|
syslog(LOG_INFO, "Started TDengine service successfully.");
|
||||||
|
|
||||||
for (int res = sem_wait(&exitSem); res != 0; res = sem_wait(&exitSem)) {
|
for (int res = tsem_wait(&exitSem); res != 0; res = tsem_wait(&exitSem)) {
|
||||||
if (res != EINTR) {
|
if (res != EINTR) {
|
||||||
syslog(LOG_ERR, "failed to wait exit semphore: %d", res);
|
syslog(LOG_ERR, "failed to wait exit semphore: %d", res);
|
||||||
break;
|
break;
|
||||||
|
@ -157,5 +157,5 @@ static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
|
||||||
sigaction(SIGUSR2, &act, NULL);
|
sigaction(SIGUSR2, &act, NULL);
|
||||||
|
|
||||||
// inform main thread to exit
|
// inform main thread to exit
|
||||||
sem_post(&exitSem);
|
tsem_post(&exitSem);
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,7 @@
|
||||||
#include "dnodeInt.h"
|
#include "dnodeInt.h"
|
||||||
#include "dnodeTelemetry.h"
|
#include "dnodeTelemetry.h"
|
||||||
|
|
||||||
static sem_t tsExitSem;
|
static tsem_t tsExitSem;
|
||||||
static pthread_t tsTelemetryThread;
|
static pthread_t tsTelemetryThread;
|
||||||
|
|
||||||
#define TELEMETRY_SERVER "telemetry.taosdata.com"
|
#define TELEMETRY_SERVER "telemetry.taosdata.com"
|
||||||
|
@ -266,7 +266,7 @@ int32_t dnodeInitTelemetry() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sem_init(&tsExitSem, 0, 0) == -1) {
|
if (tsem_init(&tsExitSem, 0, 0) == -1) {
|
||||||
// just log the error, it is ok for telemetry to fail
|
// just log the error, it is ok for telemetry to fail
|
||||||
dTrace("failed to create semaphore for telemetry, reason:%s", strerror(errno));
|
dTrace("failed to create semaphore for telemetry, reason:%s", strerror(errno));
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -291,8 +291,8 @@ void dnodeCleanupTelemetry() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsTelemetryThread) {
|
if (tsTelemetryThread) {
|
||||||
sem_post(&tsExitSem);
|
tsem_post(&tsExitSem);
|
||||||
pthread_join(tsTelemetryThread, NULL);
|
pthread_join(tsTelemetryThread, NULL);
|
||||||
sem_destroy(&tsExitSem);
|
tsem_destroy(&tsExitSem);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -440,9 +440,9 @@ typedef struct {
|
||||||
char* cols;
|
char* cols;
|
||||||
bool use_metric;
|
bool use_metric;
|
||||||
|
|
||||||
sem_t mutex_sem;
|
tsem_t mutex_sem;
|
||||||
int notFinished;
|
int notFinished;
|
||||||
sem_t lock_sem;
|
tsem_t lock_sem;
|
||||||
} info;
|
} info;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -459,9 +459,9 @@ typedef struct {
|
||||||
int data_of_order;
|
int data_of_order;
|
||||||
int data_of_rate;
|
int data_of_rate;
|
||||||
|
|
||||||
sem_t *mutex_sem;
|
tsem_t *mutex_sem;
|
||||||
int *notFinished;
|
int *notFinished;
|
||||||
sem_t *lock_sem;
|
tsem_t *lock_sem;
|
||||||
} sTable;
|
} sTable;
|
||||||
|
|
||||||
/* ******************************* Global
|
/* ******************************* Global
|
||||||
|
@ -729,9 +729,9 @@ int main(int argc, char *argv[]) {
|
||||||
t_info->end_table_id = i < b ? last + a : last + a - 1;
|
t_info->end_table_id = i < b ? last + a : last + a - 1;
|
||||||
last = t_info->end_table_id + 1;
|
last = t_info->end_table_id + 1;
|
||||||
|
|
||||||
sem_init(&(t_info->mutex_sem), 0, 1);
|
tsem_init(&(t_info->mutex_sem), 0, 1);
|
||||||
t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1;
|
t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1;
|
||||||
sem_init(&(t_info->lock_sem), 0, 0);
|
tsem_init(&(t_info->lock_sem), 0, 0);
|
||||||
|
|
||||||
if (query_mode == SYNC) {
|
if (query_mode == SYNC) {
|
||||||
pthread_create(pids + i, NULL, syncWrite, t_info);
|
pthread_create(pids + i, NULL, syncWrite, t_info);
|
||||||
|
@ -762,8 +762,8 @@ int main(int argc, char *argv[]) {
|
||||||
for (int i = 0; i < threads; i++) {
|
for (int i = 0; i < threads; i++) {
|
||||||
info *t_info = infos + i;
|
info *t_info = infos + i;
|
||||||
taos_close(t_info->taos);
|
taos_close(t_info->taos);
|
||||||
sem_destroy(&(t_info->mutex_sem));
|
tsem_destroy(&(t_info->mutex_sem));
|
||||||
sem_destroy(&(t_info->lock_sem));
|
tsem_destroy(&(t_info->lock_sem));
|
||||||
}
|
}
|
||||||
|
|
||||||
free(pids);
|
free(pids);
|
||||||
|
@ -1021,8 +1021,8 @@ void multiThreadCreateTable(char* cols, bool use_metric, int threads, int ntable
|
||||||
|
|
||||||
for (int i = 0; i < threads; i++) {
|
for (int i = 0; i < threads; i++) {
|
||||||
info *t_info = infos + i;
|
info *t_info = infos + i;
|
||||||
sem_destroy(&(t_info->mutex_sem));
|
tsem_destroy(&(t_info->mutex_sem));
|
||||||
sem_destroy(&(t_info->lock_sem));
|
tsem_destroy(&(t_info->lock_sem));
|
||||||
}
|
}
|
||||||
|
|
||||||
free(pids);
|
free(pids);
|
||||||
|
@ -1272,7 +1272,7 @@ void *asyncWrite(void *sarg) {
|
||||||
taos_query_a(winfo->taos, "show databases", callBack, tb_info);
|
taos_query_a(winfo->taos, "show databases", callBack, tb_info);
|
||||||
}
|
}
|
||||||
|
|
||||||
sem_wait(&(winfo->lock_sem));
|
tsem_wait(&(winfo->lock_sem));
|
||||||
free(tb_infos);
|
free(tb_infos);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1292,10 +1292,10 @@ void callBack(void *param, TAOS_RES *res, int code) {
|
||||||
|
|
||||||
// If finished;
|
// If finished;
|
||||||
if (tb_info->counter >= tb_info->target) {
|
if (tb_info->counter >= tb_info->target) {
|
||||||
sem_wait(tb_info->mutex_sem);
|
tsem_wait(tb_info->mutex_sem);
|
||||||
(*(tb_info->notFinished))--;
|
(*(tb_info->notFinished))--;
|
||||||
if (*(tb_info->notFinished) == 0) sem_post(tb_info->lock_sem);
|
if (*(tb_info->notFinished) == 0) tsem_post(tb_info->lock_sem);
|
||||||
sem_post(tb_info->mutex_sem);
|
tsem_post(tb_info->mutex_sem);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -186,7 +186,7 @@ typedef struct SQInfo {
|
||||||
void* signature;
|
void* signature;
|
||||||
int32_t pointsInterpo;
|
int32_t pointsInterpo;
|
||||||
int32_t code; // error code to returned to client
|
int32_t code; // error code to returned to client
|
||||||
// sem_t dataReady;
|
//tsem_t dataReady;
|
||||||
|
|
||||||
void* tsdb;
|
void* tsdb;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
|
|
@ -26,8 +26,8 @@ typedef struct {
|
||||||
int num;
|
int num;
|
||||||
int numOfReqs;
|
int numOfReqs;
|
||||||
int msgSize;
|
int msgSize;
|
||||||
sem_t rspSem;
|
tsem_t rspSem;
|
||||||
sem_t *pOverSem;
|
tsem_t *pOverSem;
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
void *pRpc;
|
void *pRpc;
|
||||||
} SInfo;
|
} SInfo;
|
||||||
|
@ -39,7 +39,7 @@ static void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
|
||||||
if (pEpSet) pInfo->epSet = *pEpSet;
|
if (pEpSet) pInfo->epSet = *pEpSet;
|
||||||
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
sem_post(&pInfo->rspSem);
|
tsem_post(&pInfo->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tcount = 0;
|
static int tcount = 0;
|
||||||
|
@ -60,7 +60,7 @@ static void *sendRequest(void *param) {
|
||||||
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg);
|
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg);
|
||||||
if ( pInfo->num % 20000 == 0 )
|
if ( pInfo->num % 20000 == 0 )
|
||||||
tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||||
sem_wait(&pInfo->rspSem);
|
tsem_wait(&pInfo->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
tDebug("thread:%d, it is over", pInfo->index);
|
tDebug("thread:%d, it is over", pInfo->index);
|
||||||
|
@ -171,7 +171,7 @@ int main(int argc, char *argv[]) {
|
||||||
pInfo->epSet = epSet;
|
pInfo->epSet = epSet;
|
||||||
pInfo->numOfReqs = numOfReqs;
|
pInfo->numOfReqs = numOfReqs;
|
||||||
pInfo->msgSize = msgSize;
|
pInfo->msgSize = msgSize;
|
||||||
sem_init(&pInfo->rspSem, 0, 0);
|
tsem_init(&pInfo->rspSem, 0, 0);
|
||||||
pInfo->pRpc = pRpc;
|
pInfo->pRpc = pRpc;
|
||||||
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
|
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
|
||||||
pInfo++;
|
pInfo++;
|
||||||
|
|
|
@ -27,8 +27,8 @@ typedef struct {
|
||||||
int num;
|
int num;
|
||||||
int numOfReqs;
|
int numOfReqs;
|
||||||
int msgSize;
|
int msgSize;
|
||||||
sem_t rspSem;
|
tsem_t rspSem;
|
||||||
sem_t *pOverSem;
|
tsem_t *pOverSem;
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
void *pRpc;
|
void *pRpc;
|
||||||
} SInfo;
|
} SInfo;
|
||||||
|
@ -171,7 +171,7 @@ int main(int argc, char *argv[]) {
|
||||||
pInfo->epSet = epSet;
|
pInfo->epSet = epSet;
|
||||||
pInfo->numOfReqs = numOfReqs;
|
pInfo->numOfReqs = numOfReqs;
|
||||||
pInfo->msgSize = msgSize;
|
pInfo->msgSize = msgSize;
|
||||||
sem_init(&pInfo->rspSem, 0, 0);
|
tsem_init(&pInfo->rspSem, 0, 0);
|
||||||
pInfo->pRpc = pRpc;
|
pInfo->pRpc = pRpc;
|
||||||
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
|
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
|
||||||
pInfo++;
|
pInfo++;
|
||||||
|
|
|
@ -31,7 +31,7 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
|
||||||
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp);
|
static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp);
|
||||||
static void arbProcessBrokenLink(void *param);
|
static void arbProcessBrokenLink(void *param);
|
||||||
static int arbProcessPeerMsg(void *param, void *buffer);
|
static int arbProcessPeerMsg(void *param, void *buffer);
|
||||||
static sem_t tsArbSem;
|
static tsem_t tsArbSem;
|
||||||
static ttpool_h tsArbTcpPool;
|
static ttpool_h tsArbTcpPool;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -61,7 +61,7 @@ int main(int argc, char *argv[]) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sem_init(&tsArbSem, 0, 0) != 0) {
|
if (tsem_init(&tsArbSem, 0, 0) != 0) {
|
||||||
printf("failed to create exit semphore\n");
|
printf("failed to create exit semphore\n");
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
@ -98,7 +98,7 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
sInfo("TAOS arbitrator: %s:%d is running", tsNodeFqdn, tsServerPort);
|
sInfo("TAOS arbitrator: %s:%d is running", tsNodeFqdn, tsServerPort);
|
||||||
|
|
||||||
for (int res = sem_wait(&tsArbSem); res != 0; res = sem_wait(&tsArbSem)) {
|
for (int res = tsem_wait(&tsArbSem); res != 0; res = tsem_wait(&tsArbSem)) {
|
||||||
if (res != EINTR) break;
|
if (res != EINTR) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,6 +185,6 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context)
|
||||||
sInfo("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid);
|
sInfo("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid);
|
||||||
|
|
||||||
// inform main thread to exit
|
// inform main thread to exit
|
||||||
sem_post(&tsArbSem);
|
tsem_post(&tsArbSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,8 +25,8 @@ typedef struct {
|
||||||
int num;
|
int num;
|
||||||
int numOfReqs;
|
int numOfReqs;
|
||||||
int msgSize;
|
int msgSize;
|
||||||
sem_t rspSem;
|
tsem_t rspSem;
|
||||||
sem_t *pOverSem;
|
tsem_t *pOverSem;
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
void *pRpc;
|
void *pRpc;
|
||||||
} SInfo;
|
} SInfo;
|
||||||
|
@ -38,7 +38,7 @@ void processResponse(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
|
||||||
if (pEpSet) pInfo->epSet = *pEpSet;
|
if (pEpSet) pInfo->epSet = *pEpSet;
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
||||||
sem_post(&pInfo->rspSem);
|
tsem_post(&pInfo->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
int tcount = 0;
|
int tcount = 0;
|
||||||
|
@ -59,7 +59,7 @@ void *sendRequest(void *param) {
|
||||||
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg);
|
rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg);
|
||||||
if ( pInfo->num % 20000 == 0 )
|
if ( pInfo->num % 20000 == 0 )
|
||||||
uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
uInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||||
sem_wait(&pInfo->rspSem);
|
tsem_wait(&pInfo->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("thread:%d, it is over", pInfo->index);
|
uDebug("thread:%d, it is over", pInfo->index);
|
||||||
|
@ -169,7 +169,7 @@ int main(int argc, char *argv[]) {
|
||||||
pInfo->epSet = epSet;
|
pInfo->epSet = epSet;
|
||||||
pInfo->numOfReqs = numOfReqs;
|
pInfo->numOfReqs = numOfReqs;
|
||||||
pInfo->msgSize = msgSize;
|
pInfo->msgSize = msgSize;
|
||||||
sem_init(&pInfo->rspSem, 0, 0);
|
tsem_init(&pInfo->rspSem, 0, 0);
|
||||||
pInfo->pRpc = pRpc;
|
pInfo->pRpc = pRpc;
|
||||||
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
|
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
|
||||||
pInfo++;
|
pInfo++;
|
||||||
|
|
Loading…
Reference in New Issue