fix: windows compile issues

This commit is contained in:
dapan1121 2024-12-06 13:46:48 +08:00
parent aa7b0e1a61
commit abbb0663b0
5 changed files with 27 additions and 17 deletions

View File

@ -47,7 +47,7 @@ extern "C" {
#define QW_RETIRE_JOB_BATCH_NUM 5
#define QW_DEFAULT_TIMEOUT_INTERVAL_SECS 3600
#define QW_DEFAULT_TIMEOUT_INTERVAL_SECS 600
enum {
QW_CONC_TASK_LEVEL_LOW = 1,
@ -266,8 +266,10 @@ typedef struct SQWRetireCtx {
typedef struct SQueryExecStat {
int64_t taskInitNum;
int64_t taskRunNum;
int64_t taskExecDestroyNum;
int64_t taskSinkDestroyNum;
int64_t taskDestroyNum;
} SQueryExecStat;
typedef struct SQueryMgmt {

View File

@ -173,9 +173,10 @@ void qwDbgDumpJobsInfo(void) {
return;
}
qDebug("total remain job num %d, task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64,
taosHashGetSize(gQueryMgmt.pJobInfo), atomic_load_64(&gQueryMgmt.stat.taskInitNum),
atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum));
qDebug("total remain job num %d, task initNum:%" PRId64 " - %" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64 " - %" PRId64,
taosHashGetSize(gQueryMgmt.pJobInfo), atomic_load_64(&gQueryMgmt.stat.taskInitNum), atomic_load_64(&gQueryMgmt.stat.taskRunNum),
atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum),
atomic_load_64(&gQueryMgmt.stat.taskDestroyNum));
size_t keyLen = 0;
char* id = NULL;

View File

@ -258,6 +258,8 @@ int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
}
}
atomic_add_fetch_64(&gQueryMgmt.stat.taskInitNum, 1);
if (acquire && ctx) {
QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
} else if (ctx) {
@ -425,6 +427,8 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
ctx->tbInfo = NULL;
QW_TASK_DLOG_E("task ctx dropped");
atomic_add_fetch_64(&gQueryMgmt.stat.taskDestroyNum, 1);
return code;
}
@ -756,7 +760,7 @@ bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool forceStop, int32_t errC
QW_TASK_DLOG("start to stop task, forceStop:%d, error:%s", forceStop, tstrerror(errCode));
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
if ((!forceStop) && (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP))) {
QW_TASK_WLOG_E("task already dropping");
QW_UNLOCK(QW_WRITE, &ctx->lock);
@ -867,7 +871,7 @@ void qwChkDropTimeoutQuery(SQWorker *mgmt, int32_t currTs) {
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
if ((ctx->lastAckTs <= 0) || (currTs - ctx->lastAckTs) < tsQueryNoFetchTimeoutSec) {
if (((ctx->lastAckTs <= 0) || (currTs - ctx->lastAckTs) < tsQueryNoFetchTimeoutSec) && (!QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP))) {
pIter = taosHashIterate(mgmt->ctxHash, pIter);
continue;
}

View File

@ -541,8 +541,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_LOCK(QW_WRITE, &ctx->lock);
QW_SET_PHASE(ctx, phase);
if (ctx->pJobInfo && (atomic_load_8(&ctx->pJobInfo->retired) || atomic_load_32(&ctx->pJobInfo->errCode))) {
QW_TASK_ELOG("job already failed, error:%s", tstrerror(ctx->pJobInfo->errCode));
QW_ERR_JRET(ctx->pJobInfo->errCode);
@ -626,6 +624,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_ERR_JRET(ctx->rspCode);
}
QW_SET_PHASE(ctx, phase);
_return:
if (ctx) {
@ -635,7 +635,7 @@ _return:
qwReleaseTaskCtx(mgmt, ctx);
}
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_QRY_TASK_CTX_NOT_EXIST) {
QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
} else {
QW_TASK_DLOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
@ -800,7 +800,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(TSDB_CODE_APP_ERROR);
}
atomic_add_fetch_64(&gQueryMgmt.stat.taskInitNum, 1);
atomic_add_fetch_64(&gQueryMgmt.stat.taskRunNum, 1);
uint64_t flags = 0;
dsGetSinkFlags(sinkHandle, &flags);
@ -1648,9 +1648,11 @@ void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) {
}
void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) {
qDebug("need to retire jobs in batch, targetRetireSize:%" PRId64 ", remainJobNum:%d, task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64,
retireSize, taosHashGetSize(gQueryMgmt.pJobInfo), atomic_load_64(&gQueryMgmt.stat.taskInitNum),
atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum));
qDebug("need to retire jobs in batch, targetRetireSize:%" PRId64 ", remainJobNum:%d, task initNum:%" PRId64 " - %" PRId64
", task destroyNum:%" PRId64 " - %" PRId64 " - %" PRId64,
retireSize, taosHashGetSize(gQueryMgmt.pJobInfo), atomic_load_64(&gQueryMgmt.stat.taskInitNum), atomic_load_64(&gQueryMgmt.stat.taskRunNum),
atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum),
atomic_load_64(&gQueryMgmt.stat.taskDestroyNum));
SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL);
int32_t jobNum = 0;
@ -1683,8 +1685,9 @@ void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) {
taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob);
qDebug("job retire in batch done, [prev:%d, curr:%d, total:%d] jobs, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64
", task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64,
", task initNum:%" PRId64 " - %" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64 " - %" PRId64,
alreadyJobNum, jobNum, taosHashGetSize(gQueryMgmt.pJobInfo), retiredSize, retireSize,
atomic_load_64(&gQueryMgmt.stat.taskInitNum),
atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum));
atomic_load_64(&gQueryMgmt.stat.taskInitNum), atomic_load_64(&gQueryMgmt.stat.taskRunNum),
atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum),
atomic_load_64(&gQueryMgmt.stat.taskDestroyNum));
}

View File

@ -118,7 +118,7 @@ char *taosStrndupi(const char *s, int64_t size) {
char *tstrndup(const char *str, int64_t size) {
#ifdef WINDOWS
return strndup(str, size);
return taosStrndupi(str, size);
#else
char* p = strndup(str, size);
if (str != NULL && NULL == p) {