Merge pull request #1542 from taosdata/feature/query
[td-98] fix bugs in query processing
This commit is contained in:
commit
af3944bb73
|
@ -941,6 +941,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
|
||||||
sql = sToken.z;
|
sql = sToken.z;
|
||||||
}
|
}
|
||||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||||
|
|
||||||
|
if (pSql->asyncTblPos == NULL) {
|
||||||
|
assert(code == TSDB_CODE_ACTION_IN_PROGRESS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = cend - cstart + 1;
|
int32_t len = cend - cstart + 1;
|
||||||
|
@ -1064,8 +1068,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
|
||||||
|
|
||||||
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
|
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
|
||||||
/*
|
/*
|
||||||
* For async insert, after get the metermeta from server, the sql string will not be
|
* For async insert, after get the table meta from server, the sql string will not be
|
||||||
* parsed using the new metermeta to avoid the overhead cause by get metermeta data information.
|
* parsed using the new table meta to avoid the overhead cause by get table meta data information.
|
||||||
* And during the getMeterMetaCallback function, the sql string will be parsed from the
|
* And during the getMeterMetaCallback function, the sql string will be parsed from the
|
||||||
* interrupted position.
|
* interrupted position.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -5168,7 +5168,8 @@ static void singleTableQueryImpl(SQInfo* pQInfo) {
|
||||||
if (isQueryKilled(pQInfo)) {
|
if (isQueryKilled(pQInfo)) {
|
||||||
dTrace("QInfo:%p query is killed", pQInfo);
|
dTrace("QInfo:%p query is killed", pQInfo);
|
||||||
} else {
|
} else {
|
||||||
dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.size);
|
dTrace("QInfo:%p query task completed, %" PRId64 " rows will returned, total:%" PRId64 " rows", pQInfo, pQuery->rec.size,
|
||||||
|
pQuery->rec.total);
|
||||||
}
|
}
|
||||||
|
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
|
@ -5838,7 +5839,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
||||||
|
|
||||||
vnodeParametersSafetyCheck(pQuery);
|
vnodeParametersSafetyCheck(pQuery);
|
||||||
|
|
||||||
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
|
dTrace("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
|
||||||
return pQInfo;
|
return pQInfo;
|
||||||
|
|
||||||
_clean_memory:
|
_clean_memory:
|
||||||
|
@ -6157,18 +6158,14 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) {
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
if (isQueryKilled(pQInfo)) {
|
if (isQueryKilled(pQInfo)) {
|
||||||
dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
|
dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
|
||||||
if (pQInfo->code == TSDB_CODE_SUCCESS) {
|
return pQInfo->code;
|
||||||
return TSDB_CODE_QUERY_CANCELLED;
|
|
||||||
} else { // in case of not TSDB_CODE_SUCCESS, return the code to client
|
|
||||||
return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sem_wait(&pQInfo->dataReady);
|
sem_wait(&pQInfo->dataReady);
|
||||||
dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size,
|
dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size,
|
||||||
pQInfo->code);
|
pQInfo->code);
|
||||||
|
|
||||||
return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code);
|
return pQInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) {
|
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) {
|
||||||
|
@ -6213,6 +6210,7 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
|
||||||
if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) {
|
if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) {
|
||||||
code = doDumpQueryResult(pQInfo, (*pRsp)->data);
|
code = doDumpQueryResult(pQInfo, (*pRsp)->data);
|
||||||
} else {
|
} else {
|
||||||
|
setQueryStatus(pQuery, QUERY_OVER);
|
||||||
code = pQInfo->code;
|
code = pQInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -49,9 +49,8 @@ typedef struct SQueryFilePos {
|
||||||
} SQueryFilePos;
|
} SQueryFilePos;
|
||||||
|
|
||||||
typedef struct SDataBlockLoadInfo {
|
typedef struct SDataBlockLoadInfo {
|
||||||
int32_t fileListIndex;
|
SFileGroup* fileGroup;
|
||||||
int32_t fileId;
|
int32_t slot;
|
||||||
int32_t slotIdx;
|
|
||||||
int32_t sid;
|
int32_t sid;
|
||||||
SArray *pLoadedCols;
|
SArray *pLoadedCols;
|
||||||
} SDataBlockLoadInfo;
|
} SDataBlockLoadInfo;
|
||||||
|
@ -190,10 +189,9 @@ static void initQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeInitDataBlockLoadInfo(SDataBlockLoadInfo *pBlockLoadInfo) {
|
static void vnodeInitDataBlockLoadInfo(SDataBlockLoadInfo *pBlockLoadInfo) {
|
||||||
pBlockLoadInfo->slotIdx = -1;
|
pBlockLoadInfo->slot = -1;
|
||||||
pBlockLoadInfo->fileId = -1;
|
|
||||||
pBlockLoadInfo->sid = -1;
|
pBlockLoadInfo->sid = -1;
|
||||||
pBlockLoadInfo->fileListIndex = -1;
|
pBlockLoadInfo->fileGroup = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
|
static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
|
||||||
|
@ -202,76 +200,6 @@ static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
|
||||||
pCompBlockLoadInfo->fileListIndex = -1;
|
pCompBlockLoadInfo->fileListIndex = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int fileOrderComparFn(const void *p1, const void *p2) {
|
|
||||||
SHeaderFileInfo *pInfo1 = (SHeaderFileInfo *)p1;
|
|
||||||
SHeaderFileInfo *pInfo2 = (SHeaderFileInfo *)p2;
|
|
||||||
|
|
||||||
if (pInfo1->fileId == pInfo2->fileId) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return (pInfo1->fileId > pInfo2->fileId) ? 1 : -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
void vnodeRecordAllFiles(int32_t vnodeId, SQueryFilesInfo *pVnodeFilesInfo) {
|
|
||||||
char suffix[] = ".head";
|
|
||||||
pVnodeFilesInfo->pFileInfo = taosArrayInit(4, sizeof(int32_t));
|
|
||||||
|
|
||||||
struct dirent *pEntry = NULL;
|
|
||||||
pVnodeFilesInfo->vnodeId = vnodeId;
|
|
||||||
char* tsDirectory = "";
|
|
||||||
|
|
||||||
sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId);
|
|
||||||
DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix);
|
|
||||||
if (pDir == NULL) {
|
|
||||||
// dError("QInfo:%p failed to open directory:%s, %s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix,
|
|
||||||
// strerror(errno));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
while ((pEntry = readdir(pDir)) != NULL) {
|
|
||||||
if ((pEntry->d_name[0] == '.' && pEntry->d_name[1] == '\0') || (strcmp(pEntry->d_name, "..") == 0)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pEntry->d_type & DT_DIR) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t len = strlen(pEntry->d_name);
|
|
||||||
if (strcasecmp(&pEntry->d_name[len - 5], suffix) != 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t vid = 0;
|
|
||||||
int32_t fid = 0;
|
|
||||||
sscanf(pEntry->d_name, "v%df%d", &vid, &fid);
|
|
||||||
if (vid != vnodeId) { /* ignore error files */
|
|
||||||
// dError("QInfo:%p error data file:%s in vid:%d, ignore", pQInfo, pEntry->d_name, vnodeId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// int32_t firstFid = pVnode->fileId - pVnode->numOfFiles + 1;
|
|
||||||
// if (fid > pVnode->fileId || fid < firstFid) {
|
|
||||||
// dError("QInfo:%p error data file:%s in vid:%d, fid:%d, fid range:%d-%d", pQInfo, pEntry->d_name, vnodeId,
|
|
||||||
// fid, firstFid, pVnode->fileId);
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
|
|
||||||
assert(fid >= 0 && vid >= 0);
|
|
||||||
taosArrayPush(pVnodeFilesInfo->pFileInfo, &fid);
|
|
||||||
}
|
|
||||||
|
|
||||||
closedir(pDir);
|
|
||||||
|
|
||||||
// dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles,
|
|
||||||
// pVnodeFilesInfo->dbFilePathPrefix);
|
|
||||||
|
|
||||||
// order the files information according their names */
|
|
||||||
size_t numOfFiles = taosArrayGetSize(pVnodeFilesInfo->pFileInfo);
|
|
||||||
qsort(pVnodeFilesInfo->pFileInfo->pData, numOfFiles, sizeof(SHeaderFileInfo), fileOrderComparFn);
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) {
|
tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) {
|
||||||
// todo 1. filter not exist table
|
// todo 1. filter not exist table
|
||||||
|
|
||||||
|
@ -807,6 +735,11 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) {
|
if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) {
|
||||||
|
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
|
||||||
|
pBlockLoadInfo->fileGroup = pCheckInfo->pFileGroup;
|
||||||
|
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
|
||||||
|
pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid;
|
||||||
|
|
||||||
blockLoaded = true;
|
blockLoaded = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -815,6 +748,9 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
|
||||||
|
|
||||||
// failed to load data from disk, abort current query
|
// failed to load data from disk, abort current query
|
||||||
if (blockLoaded == false) {
|
if (blockLoaded == false) {
|
||||||
|
taosArrayDestroy(sa);
|
||||||
|
tfree(data);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1001,10 +937,16 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
|
||||||
return pHandle->pColumns;
|
return pHandle->pColumns;
|
||||||
} else {
|
} else {
|
||||||
SArray *sa = getDefaultLoadColumns(pHandle, true);
|
SArray *sa = getDefaultLoadColumns(pHandle, true);
|
||||||
|
|
||||||
doLoadDataFromFileBlock(pHandle);
|
// data block has been loaded, todo extract method
|
||||||
filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa);
|
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
|
||||||
return pHandle->pColumns;
|
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->sid == pCheckInfo->pTableObj->tableId.tid) {
|
||||||
|
return pHandle->pColumns;
|
||||||
|
} else {
|
||||||
|
doLoadDataFromFileBlock(pHandle);
|
||||||
|
filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa);
|
||||||
|
return pHandle->pColumns;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1361,7 +1303,10 @@ void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
|
||||||
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
||||||
tSkipListDestroyIter(pTableCheckInfo->iter);
|
tSkipListDestroyIter(pTableCheckInfo->iter);
|
||||||
|
|
||||||
tfree(pTableCheckInfo->pDataCols->buf);
|
if (pTableCheckInfo->pDataCols != NULL) {
|
||||||
|
tfree(pTableCheckInfo->pDataCols->buf);
|
||||||
|
}
|
||||||
|
|
||||||
tfree(pTableCheckInfo->pDataCols);
|
tfree(pTableCheckInfo->pDataCols);
|
||||||
|
|
||||||
tfree(pTableCheckInfo->pCompInfo);
|
tfree(pTableCheckInfo->pCompInfo);
|
||||||
|
|
Loading…
Reference in New Issue