remove redundant codes
This commit is contained in:
parent
26e4efe539
commit
160fd22802
|
@ -1181,7 +1181,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
|
|||
tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
|
||||
pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
|
||||
|
||||
if (num > tsMaxNumOfOrderedResults) {
|
||||
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
|
||||
tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId64 " , current:%" PRId64,
|
||||
pPObj, pSql, tsMaxNumOfOrderedResults, num);
|
||||
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_SORTED_RES_TOO_MANY);
|
||||
|
|
|
@ -4440,15 +4440,6 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
// if (FD_VALID(pSupporter->meterOutputFd)) {
|
||||
// assert(pSupporter->meterOutputMMapBuf != NULL);
|
||||
// dTrace("QInfo:%p disk-based output buffer during query:%" PRId64 " bytes", pQInfo, pSupporter->bufSize);
|
||||
// munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize);
|
||||
// tclose(pSupporter->meterOutputFd);
|
||||
|
||||
// unlink(pSupporter->extBufFile);
|
||||
// }
|
||||
|
||||
tSidSetDestroy(&pSupporter->pSidSet);
|
||||
|
||||
if (pSupporter->pMeterDataInfo != NULL) {
|
||||
|
@ -4501,12 +4492,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
|
|||
pQuery->lastKey = pQuery->skey;
|
||||
|
||||
// create runtime environment
|
||||
// SSchema *pColumnModel = NULL;
|
||||
|
||||
tTagSchema *pTagSchemaInfo = pSupporter->pSidSet->pColumnModel;
|
||||
// if (pTagSchemaInfo != NULL) {
|
||||
// pColumnModel = pTagSchemaInfo->pSchema;
|
||||
// }
|
||||
|
||||
// get one queried meter
|
||||
SMeterObj *pMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pSidSet->pSids[0]->sid);
|
||||
|
@ -4543,23 +4529,6 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
|
|||
}
|
||||
|
||||
if (pQuery->nAggTimeInterval != 0) {
|
||||
// getTmpfilePath("tb_metric_mmap", pSupporter->extBufFile);
|
||||
// pSupporter->meterOutputFd = open(pSupporter->extBufFile, O_CREAT | O_RDWR, 0666);
|
||||
|
||||
// if (!FD_VALID(pSupporter->meterOutputFd)) {
|
||||
// dError("QInfo:%p failed to create file: %s on disk. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
|
||||
// return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
// }
|
||||
|
||||
// pSupporter->numOfPages = pSupporter->numOfMeters;
|
||||
|
||||
// ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE);
|
||||
// if (ret != TSDB_CODE_SUCCESS) {
|
||||
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
|
||||
// strerror(errno));
|
||||
// return TSDB_CODE_SERV_NO_DISKSPACE;
|
||||
// }
|
||||
//
|
||||
// one page for each table at least
|
||||
ret = createResultBuf(&pSupporter->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -4567,15 +4536,6 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
|
|||
}
|
||||
|
||||
pRuntimeEnv->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / pQuery->rowSize;
|
||||
// pSupporter->lastPageId = -1;
|
||||
// pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE;
|
||||
|
||||
// pSupporter->meterOutputMMapBuf =
|
||||
// mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0);
|
||||
// if (pSupporter->meterOutputMMapBuf == MAP_FAILED) {
|
||||
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
|
||||
// return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
// }
|
||||
}
|
||||
|
||||
// metric query do not invoke interpolation, it will be done at the second-stage merge
|
||||
|
@ -5377,12 +5337,8 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery)
|
|||
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
|
||||
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
|
||||
|
||||
SIDList list = getDataBufPagesIdList(pResultBuf, 200000 + pSupporter->offset);
|
||||
SIDList list = getDataBufPagesIdList(pResultBuf, 200000 + pSupporter->offset + (pSupporter->subgroupIdx - 1)* 10000);
|
||||
|
||||
// char * pStart = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * (pSupporter->lastPageId + 1) +
|
||||
// pSupporter->groupResultSize * pSupporter->offset;
|
||||
// uint64_t numOfElem = ((tFilePage *)pStart)->numOfElems;
|
||||
// assert(numOfElem <= pQuery->pointsToRead);
|
||||
int32_t total = 0;
|
||||
for(int32_t i = 0; i < list.size; ++i) {
|
||||
tFilePage* pData = getResultBufferPageById(pResultBuf, list.pData[i]);
|
||||
|
@ -5397,15 +5353,12 @@ void copyResToQueryResultBuf(SMeterQuerySupportObj *pSupporter, SQuery *pQuery)
|
|||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
|
||||
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
|
||||
char* pDest = pQuery->sdata[i]->data;
|
||||
|
||||
memcpy(pQuery->sdata[i]->data + pRuntimeEnv->offset[i] * total + offset * bytes,
|
||||
pData->data + pRuntimeEnv->offset[i] * pData->numOfElems,
|
||||
bytes * pData->numOfElems);
|
||||
// pStart += pRuntimeEnv->pCtx[i].outputBytes * pQuery->pointsToRead + sizeof(tFilePage);
|
||||
memcpy(pDest + offset*bytes, pData->data + pRuntimeEnv->offset[i] * pData->numOfElems, bytes * pData->numOfElems);
|
||||
}
|
||||
|
||||
offset += pData->numOfElems;
|
||||
// pQuery->sdata[0]->len += pData->numOfElems;
|
||||
}
|
||||
|
||||
assert(pQuery->pointsRead == 0);
|
||||
|
@ -5466,7 +5419,6 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
|
|||
tFilePage *pPage = getMeterDataPage(pResultBuf, pValidMeter[pos]->pMeterQInfo, position->pageIdx);
|
||||
|
||||
int64_t ts = getCurrentTimestamp(&cs, pos);
|
||||
printf("++++++++++++++++++++++%d, %d, %lld\n", position->pageIdx, pos, ts);
|
||||
if (ts == lastTimestamp) {// merge with the last one
|
||||
doMerge(pRuntimeEnv, ts, pPage, position->rowIdx, true);
|
||||
} else {
|
||||
|
@ -5551,64 +5503,8 @@ int32_t doMergeMetersResultsToGroupRes(SMeterQuerySupportObj *pSupporter, SQuery
|
|||
return pSupporter->numOfGroupResultPages;
|
||||
}
|
||||
|
||||
//static int32_t extendDiskBuf(const SQuery *pQuery, SMeterQuerySupportObj *pSupporter, int32_t numOfPages) {
|
||||
// assert(pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE == pSupporter->bufSize);
|
||||
//
|
||||
// SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
|
||||
//
|
||||
// int32_t ret = munmap(pSupporter->meterOutputMMapBuf, pSupporter->bufSize);
|
||||
// pSupporter->numOfPages = numOfPages;
|
||||
//
|
||||
// /*
|
||||
// * disk-based output buffer is exhausted, try to extend the disk-based buffer, the available disk space may
|
||||
// * be insufficient
|
||||
// */
|
||||
// ret = ftruncate(pSupporter->meterOutputFd, pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE);
|
||||
// if (ret != 0) {
|
||||
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
|
||||
// strerror(errno));
|
||||
// pQInfo->code = -TSDB_CODE_SERV_NO_DISKSPACE;
|
||||
// pQInfo->killed = 1;
|
||||
//
|
||||
// return pQInfo->code;
|
||||
// }
|
||||
//
|
||||
// pSupporter->bufSize = pSupporter->numOfPages * DEFAULT_INTERN_BUF_SIZE;
|
||||
// pSupporter->meterOutputMMapBuf =
|
||||
// mmap(NULL, pSupporter->bufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pSupporter->meterOutputFd, 0);
|
||||
//
|
||||
// if (pSupporter->meterOutputMMapBuf == MAP_FAILED) {
|
||||
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
|
||||
// pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
// pQInfo->killed = 1;
|
||||
//
|
||||
// return pQInfo->code;
|
||||
// }
|
||||
//
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
//}
|
||||
|
||||
int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQuery,
|
||||
const SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
printf("group===============%d\n", pSupporter->numOfGroupResultPages);
|
||||
// int32_t numOfMeterResultBufPages = pSupporter->lastPageId + 1;
|
||||
// int64_t dstSize = numOfMeterResultBufPages * DEFAULT_INTERN_BUF_SIZE +
|
||||
// pSupporter->groupResultSize * (pSupporter->numOfGroupResultPages + 1);
|
||||
//
|
||||
// int32_t requiredPages = pSupporter->numOfPages;
|
||||
// if (requiredPages * DEFAULT_INTERN_BUF_SIZE < dstSize) {
|
||||
// while (requiredPages * DEFAULT_INTERN_BUF_SIZE < dstSize) {
|
||||
// requiredPages += pSupporter->numOfMeters;
|
||||
// }
|
||||
//
|
||||
// if (extendDiskBuf(pQuery, pSupporter, requiredPages) != TSDB_CODE_SUCCESS) {
|
||||
// return -1;
|
||||
// }
|
||||
// }
|
||||
|
||||
// char *lastPosition = pSupporter->meterOutputMMapBuf + DEFAULT_INTERN_BUF_SIZE * numOfMeterResultBufPages +
|
||||
// pSupporter->groupResultSize * pSupporter->numOfGroupResultPages;
|
||||
|
||||
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
|
||||
int32_t capacity = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage))/ pQuery->rowSize;
|
||||
|
||||
|
@ -5625,27 +5521,21 @@ int32_t flushFromResultBuf(SMeterQuerySupportObj *pSupporter, const SQuery *pQue
|
|||
r = capacity;
|
||||
}
|
||||
|
||||
tFilePage* buf = getNewDataBuf(pResultBuf, base + pSupporter->numOfGroupResultPages, &pageId);
|
||||
tFilePage* buf = getNewDataBuf(pResultBuf, base + pSupporter->subgroupIdx*10000 + pSupporter->numOfGroupResultPages, &pageId);
|
||||
|
||||
//pagewise copy to dest buffer
|
||||
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
|
||||
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
|
||||
buf->numOfElems = r;
|
||||
|
||||
memcpy(buf->data + pRuntimeEnv->offset[i] * buf->numOfElems, ((char*)pQuery->sdata[i]->data) + offset * bytes, buf->numOfElems * bytes);
|
||||
memcpy(buf->data + pRuntimeEnv->offset[i] * buf->numOfElems, ((char*)pQuery->sdata[i]->data) + offset * bytes,
|
||||
buf->numOfElems * bytes);
|
||||
}
|
||||
|
||||
offset += r;
|
||||
remain -= r;
|
||||
}
|
||||
|
||||
// for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
|
||||
// int32_t size = pRuntimeEnv->pCtx[i].outputBytes * pQuery->sdata[0]->len + sizeof(tFilePage);
|
||||
// memcpy(lastPosition, pQuery->sdata[i], size);
|
||||
//
|
||||
// lastPosition += pRuntimeEnv->pCtx[i].outputBytes * pQuery->pointsToRead + sizeof(tFilePage);
|
||||
// }
|
||||
|
||||
pSupporter->numOfGroupResultPages += 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -6451,35 +6341,6 @@ void changeMeterQueryInfoForSuppleQuery(SQueryResultBuf* pResultBuf, SMeterQuery
|
|||
}
|
||||
}
|
||||
|
||||
//static tFilePage *allocNewPage(SQuery *pQuery, SMeterQuerySupportObj *pSupporter, uint32_t *pageId) {
|
||||
// if (pSupporter->lastPageId == pSupporter->numOfPages - 1) {
|
||||
// if (extendDiskBuf(pQuery, pSupporter, pSupporter->numOfPages + pSupporter->numOfMeters) != TSDB_CODE_SUCCESS) {
|
||||
// return NULL;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// *pageId = (++pSupporter->lastPageId);
|
||||
// return getFilePage(pSupporter, *pageId);
|
||||
//}
|
||||
|
||||
//tFilePage *addDataPageForMeterQueryInfo(SQuery *pQuery, SMeterQueryInfo *pMeterQueryInfo,
|
||||
// SMeterQuerySupportObj *pSupporter) {
|
||||
// uint32_t pageId = 0;
|
||||
//
|
||||
// tFilePage *pPage = allocNewPage(pQuery, pSupporter, &pageId);
|
||||
// if (pPage == NULL) { // failed to allocate disk-based buffer for intermediate results
|
||||
// return NULL;
|
||||
// }
|
||||
//
|
||||
// if (pMeterQueryInfo->numOfPages >= pMeterQueryInfo->numOfAlloc) {
|
||||
// pMeterQueryInfo->numOfAlloc = pMeterQueryInfo->numOfAlloc << 1;
|
||||
// pMeterQueryInfo->pageList = realloc(pMeterQueryInfo->pageList, sizeof(uint32_t) * pMeterQueryInfo->numOfAlloc);
|
||||
// }
|
||||
//
|
||||
// pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages++] = pageId;
|
||||
// return pPage;
|
||||
//}
|
||||
|
||||
void saveIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *pMeterQueryInfo) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
|
@ -7006,20 +6867,15 @@ int32_t setOutputBufferForIntervalQuery(SMeterQuerySupportObj *pSupporter, SMete
|
|||
tFilePage * pData = NULL;
|
||||
|
||||
SQueryResultBuf* pResultBuf = pSupporter->pResultBuf;
|
||||
// SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
// in the first scan, new space needed for results
|
||||
SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
|
||||
int32_t pageId = -1;
|
||||
if (list.size == 0) {
|
||||
// pData = addDataPageForMeterQueryInfo(pQuery, pMeterQueryInfo, pSupporter);
|
||||
pData = getNewDataBuf(pResultBuf, pMeterQueryInfo->sid, &pageId);
|
||||
} else {
|
||||
// int32_t lastPageId = pMeterQueryInfo->pageList[pMeterQueryInfo->numOfPages - 1];
|
||||
pData = getResultBufferPageById(pResultBuf, getLastPageId(&list));
|
||||
// pData = getFilePage(pSupporter, lastPageId);
|
||||
|
||||
printf("==============%d\n", pData->numOfElems);
|
||||
if (pData->numOfElems >= pRuntimeEnv->numOfRowsPerPage) {
|
||||
pData = getNewDataBuf(pResultBuf, pMeterQueryInfo->sid, &pageId);
|
||||
if (pData != NULL) {
|
||||
|
@ -7637,7 +7493,6 @@ int32_t saveResult(SMeterQuerySupportObj *pSupporter, SMeterQueryInfo *pMeterQue
|
|||
|
||||
// in handling records occuring around '1970-01-01', the aligned start timestamp may be 0.
|
||||
TSKEY ts = *(TSKEY *)getOutputResPos(pRuntimeEnv, pData, pData->numOfElems, 0);
|
||||
printf("-----------------------%d\n", pData->numOfElems);
|
||||
|
||||
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
|
||||
qTrace("QInfo:%p vid:%d sid:%d id:%s, save results, ts:%" PRId64 ", total:%d", GET_QINFO_ADDR(pQuery),
|
||||
|
|
|
@ -173,7 +173,12 @@ tFilePage* getNewDataBuf(SQueryResultBuf* pResultBuf, int32_t groupId, int32_t*
|
|||
*pageId = (pResultBuf->allocateId++);
|
||||
registerPageId(pResultBuf, groupId, *pageId);
|
||||
|
||||
return getResultBufferPageById(pResultBuf, *pageId);
|
||||
tFilePage* page = getResultBufferPageById(pResultBuf, *pageId);
|
||||
|
||||
// clear memory for the new page
|
||||
memset(page, 0, DEFAULT_INTERN_BUF_SIZE);
|
||||
|
||||
return page;
|
||||
}
|
||||
|
||||
int32_t getNumOfRowsPerPage(SQueryResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
|
||||
|
|
Loading…
Reference in New Issue