Merge pull request #2357 from taosdata/feature/query
[td-225] add cancel query support, fix bugs in load file blocks when …
This commit is contained in:
commit
c9860ac555
|
@ -294,11 +294,12 @@ typedef struct STscObj {
|
||||||
} STscObj;
|
} STscObj;
|
||||||
|
|
||||||
typedef struct SSqlObj {
|
typedef struct SSqlObj {
|
||||||
void * signature;
|
void *signature;
|
||||||
STscObj *pTscObj;
|
STscObj *pTscObj;
|
||||||
void (*fp)();
|
void *SRpcReqContext;
|
||||||
void (*fetchFp)();
|
void (*fp)();
|
||||||
void * param;
|
void (*fetchFp)();
|
||||||
|
void *param;
|
||||||
int64_t stime;
|
int64_t stime;
|
||||||
uint32_t queryId;
|
uint32_t queryId;
|
||||||
void * pStream;
|
void * pStream;
|
||||||
|
|
|
@ -196,8 +196,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
||||||
.handle = pSql,
|
.handle = pSql,
|
||||||
.code = 0
|
.code = 0
|
||||||
};
|
};
|
||||||
rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
|
|
||||||
|
|
||||||
|
pSql->SRpcReqContext = rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -422,7 +422,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
|
||||||
* sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
|
* sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
|
||||||
*/
|
*/
|
||||||
pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||||
// taosStopRpcConn(pSql->pSubs[i]->);
|
rpcCancelRequest(pSql->pSubs[i]->SRpcReqContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -627,7 +627,7 @@ void taos_stop_query(TAOS_RES *res) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
//taosStopRpcConn(pSql->thandle);
|
rpcCancelRequest(pSql->SRpcReqContext);
|
||||||
tscTrace("%p query is cancelled", res);
|
tscTrace("%p query is cancelled", res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -84,7 +84,7 @@ void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet);
|
||||||
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
||||||
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp);
|
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||||
int rpcReportProgress(void *pConn, char *pCont, int contLen);
|
int rpcReportProgress(void *pConn, char *pCont, int contLen);
|
||||||
void rpcCanelRequest(void *pContext);
|
void rpcCancelRequest(void *pContext);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -792,38 +792,77 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
|
||||||
int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
||||||
|
|
||||||
//data in buffer has greater timestamp, copy data in file block
|
//data in buffer has greater timestamp, copy data in file block
|
||||||
for (int32_t i = 0; i < requiredNumOfCols; ++i) {
|
int32_t i = 0, j = 0;
|
||||||
|
while(i < requiredNumOfCols && j < pCols->numOfCols) {
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
||||||
int32_t bytes = pColInfo->info.bytes;
|
|
||||||
|
SDataCol* src = &pCols->cols[j];
|
||||||
|
if (src->colId < pColInfo->info.colId) {
|
||||||
|
j++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t bytes = pColInfo->info.bytes;
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
|
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
|
||||||
} else {
|
} else {
|
||||||
pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
|
pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t j = 0; j < pCols->numOfCols; ++j) { // todo opt performance
|
if (pColInfo->info.colId == src->colId) {
|
||||||
SDataCol* src = &pCols->cols[j];
|
|
||||||
|
if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
|
||||||
if (pColInfo->info.colId == src->colId) {
|
memmove(pData, src->pData + bytes * start, bytes * num);
|
||||||
|
} else { // handle the var-string
|
||||||
if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
|
char* dst = pData;
|
||||||
memmove(pData, src->pData + bytes * start, bytes * num);
|
|
||||||
} else { // handle the var-string
|
// todo refactor, only copy one-by-one
|
||||||
char* dst = pData;
|
for (int32_t k = start; k < num + start; ++k) {
|
||||||
|
char* p = tdGetColDataOfRow(src, k);
|
||||||
// todo refactor, only copy one-by-one
|
memcpy(dst, p, varDataTLen(p));
|
||||||
for (int32_t k = start; k < num + start; ++k) {
|
dst += bytes;
|
||||||
char* p = tdGetColDataOfRow(src, k);
|
|
||||||
memcpy(dst, p, varDataTLen(p));
|
|
||||||
dst += bytes;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
j++;
|
||||||
|
i++;
|
||||||
|
} else { // pColInfo->info.colId < src->colId, it is a NULL data
|
||||||
|
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
char* dst = pData;
|
||||||
|
|
||||||
|
for(int32_t k = start; k < num + start; ++k) {
|
||||||
|
setVardataNull(dst, pColInfo->info.type);
|
||||||
|
dst += bytes;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
|
||||||
|
}
|
||||||
|
i++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while (i < requiredNumOfCols) { // the remain columns are all null data
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
||||||
|
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
|
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
|
||||||
|
} else {
|
||||||
|
pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
char* dst = pData;
|
||||||
|
|
||||||
|
for(int32_t k = start; k < num + start; ++k) {
|
||||||
|
setVardataNull(dst, pColInfo->info.type);
|
||||||
|
dst += pColInfo->info.bytes;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
|
||||||
|
}
|
||||||
|
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
|
||||||
pQueryHandle->cur.win.ekey = tsArray[end];
|
pQueryHandle->cur.win.ekey = tsArray[end];
|
||||||
pQueryHandle->cur.lastKey = tsArray[end] + step;
|
pQueryHandle->cur.lastKey = tsArray[end] + step;
|
||||||
|
|
Loading…
Reference in New Issue