[td-225] fix compiler error.
This commit is contained in:
parent
bab798f439
commit
59e7bcf1dd
|
@ -7058,198 +7058,6 @@ bool doBuildResCheck(SQInfo* pQInfo) {
|
||||||
return buildRes;
|
return buildRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool qTableQuery(qinfo_t qinfo) {
|
|
||||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
|
||||||
assert(pQInfo && pQInfo->signature == pQInfo);
|
|
||||||
int64_t threadId = taosGetSelfPthreadId();
|
|
||||||
|
|
||||||
int64_t curOwner = 0;
|
|
||||||
if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) {
|
|
||||||
qError("QInfo:%p qhandle is now executed by thread:%p", pQInfo, (void*) curOwner);
|
|
||||||
pQInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
pQInfo->startExecTs = taosGetTimestampSec();
|
|
||||||
|
|
||||||
if (isQueryKilled(pQInfo)) {
|
|
||||||
qDebug("QInfo:%p it is already killed, abort", pQInfo);
|
|
||||||
return doBuildResCheck(pQInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
|
|
||||||
qDebug("QInfo:%p no table exists for query, abort", pQInfo);
|
|
||||||
setQueryStatus(pQInfo->runtimeEnv.pQuery, QUERY_COMPLETED);
|
|
||||||
return doBuildResCheck(pQInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
// error occurs, record the error code and return to client
|
|
||||||
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
|
||||||
pQInfo->code = ret;
|
|
||||||
qDebug("QInfo:%p query abort due to error/cancel occurs, code:%s", pQInfo, tstrerror(pQInfo->code));
|
|
||||||
return doBuildResCheck(pQInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("QInfo:%p query task is launched", pQInfo);
|
|
||||||
|
|
||||||
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
|
||||||
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
|
|
||||||
assert(pQInfo->runtimeEnv.pQueryHandle == NULL);
|
|
||||||
buildTagQueryResult(pQInfo);
|
|
||||||
} else if (pQInfo->runtimeEnv.stableQuery) {
|
|
||||||
stableQueryImpl(pQInfo);
|
|
||||||
} else if (pQInfo->runtimeEnv.queryBlockDist){
|
|
||||||
buildTableBlockDistResult(pQInfo);
|
|
||||||
} else {
|
|
||||||
tableQueryImpl(pQInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
|
||||||
if (isQueryKilled(pQInfo)) {
|
|
||||||
qDebug("QInfo:%p query is killed", pQInfo);
|
|
||||||
} else if (pQuery->rec.rows == 0) {
|
|
||||||
qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
|
|
||||||
} else {
|
|
||||||
qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
|
|
||||||
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
|
|
||||||
}
|
|
||||||
|
|
||||||
return doBuildResCheck(pQInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext) {
|
|
||||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
|
||||||
|
|
||||||
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
|
|
||||||
qError("QInfo:%p invalid qhandle", pQInfo);
|
|
||||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
|
||||||
}
|
|
||||||
|
|
||||||
*buildRes = false;
|
|
||||||
if (IS_QUERY_KILLED(pQInfo)) {
|
|
||||||
qDebug("QInfo:%p query is killed, code:0x%08x", pQInfo, pQInfo->code);
|
|
||||||
return pQInfo->code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
if (tsRetrieveBlockingModel) {
|
|
||||||
pQInfo->rspContext = pRspContext;
|
|
||||||
tsem_wait(&pQInfo->ready);
|
|
||||||
*buildRes = true;
|
|
||||||
code = pQInfo->code;
|
|
||||||
} else {
|
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
|
||||||
|
|
||||||
pthread_mutex_lock(&pQInfo->lock);
|
|
||||||
|
|
||||||
assert(pQInfo->rspContext == NULL);
|
|
||||||
if (pQInfo->dataReady == QUERY_RESULT_READY) {
|
|
||||||
*buildRes = true;
|
|
||||||
qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%" PRId64 ", code:%s", pQInfo, pQuery->resultRowSize,
|
|
||||||
pQuery->rec.rows, tstrerror(pQInfo->code));
|
|
||||||
} else {
|
|
||||||
*buildRes = false;
|
|
||||||
qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo);
|
|
||||||
pQInfo->rspContext = pRspContext;
|
|
||||||
assert(pQInfo->rspContext != NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
code = pQInfo->code;
|
|
||||||
pthread_mutex_unlock(&pQInfo->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen, bool* continueExec) {
|
|
||||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
|
||||||
|
|
||||||
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
|
|
||||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
|
||||||
}
|
|
||||||
|
|
||||||
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
|
||||||
size_t size = getResultSize(pQInfo, &pQuery->rec.rows);
|
|
||||||
|
|
||||||
size += sizeof(int32_t);
|
|
||||||
size += sizeof(STableIdInfo) * taosHashGetSize(pQInfo->arrTableIdInfo);
|
|
||||||
|
|
||||||
*contLen = (int32_t)(size + sizeof(SRetrieveTableRsp));
|
|
||||||
|
|
||||||
// todo proper handle failed to allocate memory,
|
|
||||||
// current solution only avoid crash, but cannot return error code to client
|
|
||||||
*pRsp = (SRetrieveTableRsp *)rpcMallocCont(*contLen);
|
|
||||||
if (*pRsp == NULL) {
|
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
(*pRsp)->numOfRows = htonl((int32_t)pQuery->rec.rows);
|
|
||||||
|
|
||||||
if (pQInfo->code == TSDB_CODE_SUCCESS) {
|
|
||||||
(*pRsp)->offset = htobe64(pQuery->limit.offset);
|
|
||||||
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
|
|
||||||
} else {
|
|
||||||
(*pRsp)->offset = 0;
|
|
||||||
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
|
|
||||||
}
|
|
||||||
|
|
||||||
(*pRsp)->precision = htons(pQuery->precision);
|
|
||||||
if (pQuery->rec.rows > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
|
|
||||||
doDumpQueryResult(pQInfo, (*pRsp)->data);
|
|
||||||
} else {
|
|
||||||
setQueryStatus(pQuery, QUERY_OVER);
|
|
||||||
}
|
|
||||||
|
|
||||||
pQInfo->rspContext = NULL;
|
|
||||||
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
|
|
||||||
|
|
||||||
if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
|
|
||||||
// here current thread hold the refcount, so it is safe to free tsdbQueryHandle.
|
|
||||||
*continueExec = false;
|
|
||||||
(*pRsp)->completed = 1; // notify no more result to client
|
|
||||||
} else {
|
|
||||||
*continueExec = true;
|
|
||||||
qDebug("QInfo:%p has more results to retrieve", pQInfo);
|
|
||||||
}
|
|
||||||
if (pQInfo->code != TSDB_CODE_SUCCESS) {
|
|
||||||
rpcFreeCont(*pRsp);
|
|
||||||
*pRsp = NULL;
|
|
||||||
}
|
|
||||||
return pQInfo->code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qQueryCompleted(qinfo_t qinfo) {
|
|
||||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
|
||||||
|
|
||||||
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
|
|
||||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
|
||||||
}
|
|
||||||
|
|
||||||
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
|
||||||
return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qKillQuery(qinfo_t qinfo) {
|
|
||||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
|
||||||
|
|
||||||
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
|
|
||||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
|
||||||
}
|
|
||||||
|
|
||||||
setQueryKilled(pQInfo);
|
|
||||||
|
|
||||||
// Wait for the query executing thread being stopped/
|
|
||||||
// Once the query is stopped, the owner of qHandle will be cleared immediately.
|
|
||||||
while (pQInfo->owner != 0) {
|
|
||||||
taosMsleep(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes) {
|
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes) {
|
||||||
if (val == NULL) {
|
if (val == NULL) {
|
||||||
setNull(output, type, bytes);
|
setNull(output, type, bytes);
|
||||||
|
|
|
@ -353,6 +353,11 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
|
||||||
qDebug("QInfo:%p has more results to retrieve", pQInfo);
|
qDebug("QInfo:%p has more results to retrieve", pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pQInfo->code != TSDB_CODE_SUCCESS) {
|
||||||
|
rpcFreeCont(*pRsp);
|
||||||
|
*pRsp = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
return pQInfo->code;
|
return pQInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue