[td-225] fix bugs for fill operation in both table/super table query

This commit is contained in:
hjLiao 2020-05-16 01:51:50 +08:00
parent 08968b94bb
commit 9376be5fa5
19 changed files with 628 additions and 643 deletions

View File

@ -60,7 +60,7 @@ typedef struct SLocalReducer {
char * prevRowOfInput; char * prevRowOfInput;
tFilePage * pResultBuf; tFilePage * pResultBuf;
int32_t nResultBufSize; int32_t nResultBufSize;
char * pBufForInterpo; // intermediate buffer for interpolation // char * pBufForInterpo; // intermediate buffer for interpolation
tFilePage * pTempBuffer; tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx; struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result. int32_t rowSize; // size of each intermediate result.
@ -68,9 +68,9 @@ typedef struct SLocalReducer {
bool hasPrevRow; // cannot be released bool hasPrevRow; // cannot be released
bool hasUnprocessedRow; bool hasUnprocessedRow;
tOrderDescriptor * pDesc; tOrderDescriptor * pDesc;
SColumnModel * resColModel; SColumnModel * resColModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SInterpolationInfo interpolationInfo; // interpolation support structure SFillInfo* pFillInfo; // interpolation support structure
char * pFinalRes; // result data after interpo char * pFinalRes; // result data after interpo
tFilePage * discardData; tFilePage * discardData;
SResultInfo * pResInfo; SResultInfo * pResInfo;

View File

@ -210,7 +210,7 @@ typedef struct SQueryInfo {
SLimitVal slimit; SLimitVal slimit;
STagCond tagCond; STagCond tagCond;
SOrderVal order; SOrderVal order;
int16_t interpoType; // interpolate type int16_t fillType; // interpolate type
int16_t numOfTables; int16_t numOfTables;
STableMetaInfo **pTableMetaInfo; STableMetaInfo **pTableMetaInfo;
struct STSBuf * tsBuf; struct STSBuf * tsBuf;

View File

@ -4159,16 +4159,16 @@ static void interp_function(SQLFunctionCtx *pCtx) {
SInterpInfoDetail *pInfoDetail = interpInfo.pInterpDetail; SInterpInfoDetail *pInfoDetail = interpInfo.pInterpDetail;
/* set no output result */ /* set no output result */
if (pInfoDetail->type == TSDB_INTERPO_NONE) { if (pInfoDetail->type == TSDB_FILL_NONE) {
pCtx->param[3].i64Key = 0; pCtx->param[3].i64Key = 0;
} else if (pInfoDetail->primaryCol == 1) { } else if (pInfoDetail->primaryCol == 1) {
*(TSKEY *)pCtx->aOutputBuf = pInfoDetail->ts; *(TSKEY *)pCtx->aOutputBuf = pInfoDetail->ts;
} else { } else {
if (pInfoDetail->type == TSDB_INTERPO_NULL) { if (pInfoDetail->type == TSDB_FILL_NULL) {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
} else if (pInfoDetail->type == TSDB_INTERPO_SET_VALUE) { } else if (pInfoDetail->type == TSDB_FILL_SET_VALUE) {
tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType); tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType);
} else if (pInfoDetail->type == TSDB_INTERPO_PREV) { } else if (pInfoDetail->type == TSDB_FILL_PREV) {
char *data = pCtx->param[1].pz; char *data = pCtx->param[1].pz;
char *pVal = data + TSDB_KEYSIZE; char *pVal = data + TSDB_KEYSIZE;
@ -4179,7 +4179,7 @@ static void interp_function(SQLFunctionCtx *pCtx) {
assignVal(pCtx->aOutputBuf, pVal, pCtx->outputBytes, pCtx->outputType); assignVal(pCtx->aOutputBuf, pVal, pCtx->outputBytes, pCtx->outputType);
} }
} else if (pInfoDetail->type == TSDB_INTERPO_LINEAR) { } else if (pInfoDetail->type == TSDB_FILL_LINEAR) {
char *data1 = pCtx->param[1].pz; char *data1 = pCtx->param[1].pz;
char *data2 = pCtx->param[2].pz; char *data2 = pCtx->param[2].pz;

View File

@ -4020,19 +4020,19 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
} }
if (strncasecmp(pItem->pVar.pz, "none", 4) == 0 && pItem->pVar.nLen == 4) { if (strncasecmp(pItem->pVar.pz, "none", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->interpoType = TSDB_INTERPO_NONE; pQueryInfo->fillType = TSDB_FILL_NONE;
} else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) { } else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->interpoType = TSDB_INTERPO_NULL; pQueryInfo->fillType = TSDB_FILL_NULL;
for (int32_t i = START_INTERPO_COL_IDX; i < size; ++i) { for (int32_t i = START_INTERPO_COL_IDX; i < size; ++i) {
TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD* pFields = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
setNull((char*)&pQueryInfo->defaultVal[i], pFields->type, pFields->bytes); setNull((char*)&pQueryInfo->defaultVal[i], pFields->type, pFields->bytes);
} }
} else if (strncasecmp(pItem->pVar.pz, "prev", 4) == 0 && pItem->pVar.nLen == 4) { } else if (strncasecmp(pItem->pVar.pz, "prev", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->interpoType = TSDB_INTERPO_PREV; pQueryInfo->fillType = TSDB_FILL_PREV;
} else if (strncasecmp(pItem->pVar.pz, "linear", 6) == 0 && pItem->pVar.nLen == 6) { } else if (strncasecmp(pItem->pVar.pz, "linear", 6) == 0 && pItem->pVar.nLen == 6) {
pQueryInfo->interpoType = TSDB_INTERPO_LINEAR; pQueryInfo->fillType = TSDB_FILL_LINEAR;
} else if (strncasecmp(pItem->pVar.pz, "value", 5) == 0 && pItem->pVar.nLen == 5) { } else if (strncasecmp(pItem->pVar.pz, "value", 5) == 0 && pItem->pVar.nLen == 5) {
pQueryInfo->interpoType = TSDB_INTERPO_SET_VALUE; pQueryInfo->fillType = TSDB_FILL_SET_VALUE;
if (pFillToken->nExpr == 1) { if (pFillToken->nExpr == 1) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1); return invalidSqlErrMsg(pQueryInfo->msg, msg1);

View File

@ -25,7 +25,7 @@
typedef struct SCompareParam { typedef struct SCompareParam {
SLocalDataSource **pLocalData; SLocalDataSource **pLocalData;
tOrderDescriptor * pDesc; tOrderDescriptor * pDesc;
int32_t numOfElems; int32_t num;
int32_t groupOrderType; int32_t groupOrderType;
} SCompareParam; } SCompareParam;
@ -47,11 +47,11 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
} }
if (pParam->groupOrderType == TSDB_ORDER_DESC) { // desc if (pParam->groupOrderType == TSDB_ORDER_DESC) { // desc
return compare_d(pDesc, pParam->numOfElems, pLocalData[pLeftIdx]->rowIdx, pLocalData[pLeftIdx]->filePage.data, return compare_d(pDesc, pParam->num, pLocalData[pLeftIdx]->rowIdx, pLocalData[pLeftIdx]->filePage.data,
pParam->numOfElems, pLocalData[pRightIdx]->rowIdx, pLocalData[pRightIdx]->filePage.data); pParam->num, pLocalData[pRightIdx]->rowIdx, pLocalData[pRightIdx]->filePage.data);
} else { } else {
return compare_a(pDesc, pParam->numOfElems, pLocalData[pLeftIdx]->rowIdx, pLocalData[pLeftIdx]->filePage.data, return compare_a(pDesc, pParam->num, pLocalData[pLeftIdx]->rowIdx, pLocalData[pLeftIdx]->filePage.data,
pParam->numOfElems, pLocalData[pRightIdx]->rowIdx, pLocalData[pRightIdx]->filePage.data); pParam->num, pLocalData[pRightIdx]->rowIdx, pLocalData[pRightIdx]->filePage.data);
} }
} }
@ -132,6 +132,26 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
} }
} }
static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
int32_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo);
int32_t offset = 0;
SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo));
for(int32_t i = 0; i < numOfCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
pFillCol[i].col.bytes = pExpr->resBytes;
pFillCol[i].col.type = pExpr->resType;
pFillCol[i].flag = pExpr->colInfo.flag;
pFillCol[i].col.offset = offset;
pFillCol[i].functionId = pExpr->functionId;
pFillCol[i].defaultVal.i = pQueryInfo->defaultVal[i];
offset += pExpr->resBytes;
}
return pFillCol;
}
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc, void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalmodel, SSqlObj* pSql) { SColumnModel *finalmodel, SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
@ -217,24 +237,24 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
ds->pMemBuffer = pMemBuffer[i]; ds->pMemBuffer = pMemBuffer[i];
ds->flushoutIdx = j; ds->flushoutIdx = j;
ds->filePage.numOfElems = 0; ds->filePage.num = 0;
ds->pageId = 0; ds->pageId = 0;
ds->rowIdx = 0; ds->rowIdx = 0;
tscTrace("%p load data from disk into memory, orderOfVnode:%d, total:%d", pSql, i + 1, idx + 1); tscTrace("%p load data from disk into memory, orderOfVnode:%d, total:%d", pSql, i + 1, idx + 1);
tExtMemBufferLoadData(pMemBuffer[i], &(ds->filePage), j, 0); tExtMemBufferLoadData(pMemBuffer[i], &(ds->filePage), j, 0);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", ds->filePage.numOfElems); printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", ds->filePage.num);
SSrcColumnInfo colInfo[256] = {0}; SSrcColumnInfo colInfo[256] = {0};
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscGetSrcColumnInfo(colInfo, pQueryInfo); tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, ds->filePage.data, ds->filePage.numOfElems, tColModelDisplayEx(pDesc->pColumnModel, ds->filePage.data, ds->filePage.num,
pMemBuffer[0]->numOfElemsPerPage, colInfo); pMemBuffer[0]->numOfElemsPerPage, colInfo);
#endif #endif
if (ds->filePage.numOfElems == 0) { // no data in this flush, the index does not increase if (ds->filePage.num == 0) { // no data in this flush, the index does not increase
tscTrace("%p flush data is empty, ignore %d flush record", pSql, idx); tscTrace("%p flush data is empty, ignore %d flush record", pSql, idx);
tfree(ds); tfree(ds);
continue; continue;
@ -254,7 +274,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
SCompareParam *param = malloc(sizeof(SCompareParam)); SCompareParam *param = malloc(sizeof(SCompareParam));
param->pLocalData = pReducer->pLocalDataSrc; param->pLocalData = pReducer->pLocalDataSrc;
param->pDesc = pReducer->pDesc; param->pDesc = pReducer->pDesc;
param->numOfElems = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage; param->num = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
param->groupOrderType = pQueryInfo->groupbyExpr.orderType; param->groupOrderType = pQueryInfo->groupbyExpr.orderType;
@ -295,25 +315,25 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
assert(finalRowLength <= pReducer->rowSize); assert(finalRowLength <= pReducer->rowSize);
pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity); pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity);
pReducer->pBufForInterpo = calloc(1, pReducer->nResultBufSize); // pReducer->pBufForInterpo = calloc(1, pReducer->nResultBufSize);
if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL || if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL ||
pReducer->pBufForInterpo == NULL || pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) { /*pReducer->pBufForInterpo == NULL || */pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) {
tfree(pReducer->pTempBuffer); tfree(pReducer->pTempBuffer);
tfree(pReducer->discardData); tfree(pReducer->discardData);
tfree(pReducer->pResultBuf); tfree(pReducer->pResultBuf);
tfree(pReducer->pFinalRes); tfree(pReducer->pFinalRes);
tfree(pReducer->pBufForInterpo); // tfree(pReducer->pBufForInterpo);
tfree(pReducer->prevRowOfInput); tfree(pReducer->prevRowOfInput);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return; return;
} }
size = tscSqlExprNumOfExprs(pQueryInfo);
pReducer->pTempBuffer->numOfElems = 0; size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo);
pReducer->pResInfo = calloc(size, sizeof(SResultInfo));
pReducer->pTempBuffer->num = 0;
pReducer->pResInfo = calloc(numOfCols, sizeof(SResultInfo));
tscCreateResPointerInfo(pRes, pQueryInfo); tscCreateResPointerInfo(pRes, pQueryInfo);
tscInitSqlContext(pCmd, pReducer, pDesc); tscInitSqlContext(pCmd, pReducer, pDesc);
@ -333,55 +353,58 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pRes->numOfGroups = 0; pRes->numOfGroups = 0;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);;
int16_t prec = tinfo.precision; TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t revisedSTime = int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec); taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo; if (pQueryInfo->fillType != TSDB_FILL_NONE) {
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols, SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
pReducer->rowSize); pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
4096, numOfCols, pQueryInfo->slidingTime, pQueryInfo->fillType, pFillCol);
}
int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols; int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
pInterpoInfo->pTags[0] = (char *)pInterpoInfo->pTags + POINTER_BYTES * pQueryInfo->groupbyExpr.numOfGroupCols; pReducer->pFillInfo->pTags[0] = (char *)pReducer->pFillInfo->pTags + POINTER_BYTES * pQueryInfo->groupbyExpr.numOfGroupCols;
for (int32_t i = 1; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { for (int32_t i = 1; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
SSchema *pSchema = getColumnModelSchema(pReducer->resColModel, startIndex + i - 1); SSchema *pSchema = getColumnModelSchema(pReducer->resColModel, startIndex + i - 1);
pInterpoInfo->pTags[i] = pSchema->bytes + pInterpoInfo->pTags[i - 1]; pReducer->pFillInfo->pTags[i] = pSchema->bytes + pReducer->pFillInfo->pTags[i - 1];
} }
} else { } else {
assert(pInterpoInfo->pTags == NULL); if (pReducer->pFillInfo != NULL) {
assert(pReducer->pFillInfo->pTags == NULL);
}
} }
} }
static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage,
int32_t orderType) { int32_t orderType) {
if (pPage->numOfElems == 0) { if (pPage->num == 0) {
return 0; return 0;
} }
assert(pPage->numOfElems <= pDesc->pColumnModel->capacity); assert(pPage->num <= pDesc->pColumnModel->capacity);
// sort before flush to disk, the data must be consecutively put on tFilePage. // sort before flush to disk, the data must be consecutively put on tFilePage.
if (pDesc->orderIdx.numOfCols > 0) { if (pDesc->orderIdx.numOfCols > 0) {
tColDataQSort(pDesc, pPage->numOfElems, 0, pPage->numOfElems - 1, pPage->data, orderType); tColDataQSort(pDesc, pPage->num, 0, pPage->num - 1, pPage->data, orderType);
} }
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
printf("%" PRIu64 " rows data flushed to disk after been sorted:\n", pPage->numOfElems); printf("%" PRIu64 " rows data flushed to disk after been sorted:\n", pPage->num);
tColModelDisplay(pDesc->pColumnModel, pPage->data, pPage->numOfElems, pPage->numOfElems); tColModelDisplay(pDesc->pColumnModel, pPage->data, pPage->num, pPage->num);
#endif #endif
// write to cache after being sorted // write to cache after being sorted
if (tExtMemBufferPut(pMemoryBuf, pPage->data, pPage->numOfElems) < 0) { if (tExtMemBufferPut(pMemoryBuf, pPage->data, pPage->num) < 0) {
tscError("failed to save data in temporary buffer"); tscError("failed to save data in temporary buffer");
return -1; return -1;
} }
pPage->numOfElems = 0; pPage->num = 0;
return 0; return 0;
} }
@ -402,17 +425,17 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa
int32_t numOfRows, int32_t orderType) { int32_t numOfRows, int32_t orderType) {
SColumnModel *pModel = pDesc->pColumnModel; SColumnModel *pModel = pDesc->pColumnModel;
if (pPage->numOfElems + numOfRows <= pModel->capacity) { if (pPage->num + numOfRows <= pModel->capacity) {
tColModelAppend(pModel, pPage, data, 0, numOfRows, numOfRows); tColModelAppend(pModel, pPage, data, 0, numOfRows, numOfRows);
return 0; return 0;
} }
// current buffer is overflow, flush data to extensive buffer // current buffer is overflow, flush data to extensive buffer
int32_t numOfRemainEntries = pModel->capacity - pPage->numOfElems; int32_t numOfRemainEntries = pModel->capacity - pPage->num;
tColModelAppend(pModel, pPage, data, 0, numOfRemainEntries, numOfRows); tColModelAppend(pModel, pPage, data, 0, numOfRemainEntries, numOfRows);
// current buffer is full, need to flushed to disk // current buffer is full, need to flushed to disk
assert(pPage->numOfElems == pModel->capacity); assert(pPage->num == pModel->capacity);
int32_t ret = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType); int32_t ret = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType);
if (ret != 0) { if (ret != 0) {
return -1; return -1;
@ -430,12 +453,12 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa
tColModelAppend(pModel, pPage, data, numOfRows - remain, numOfWriteElems, numOfRows); tColModelAppend(pModel, pPage, data, numOfRows - remain, numOfWriteElems, numOfRows);
if (pPage->numOfElems == pModel->capacity) { if (pPage->num == pModel->capacity) {
if (tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType) != TSDB_CODE_SUCCESS) { if (tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType) != TSDB_CODE_SUCCESS) {
return -1; return -1;
} }
} else { } else {
pPage->numOfElems = numOfWriteElems; pPage->num = numOfWriteElems;
} }
remain -= numOfWriteElems; remain -= numOfWriteElems;
@ -470,7 +493,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tscTrace("%p waiting for delete procedure, status: %d", pSql, status); tscTrace("%p waiting for delete procedure, status: %d", pSql, status);
} }
taosDestoryInterpoInfo(&pLocalReducer->interpolationInfo); taosDestoryFillInfo(pLocalReducer->pFillInfo);
if (pLocalReducer->pCtx != NULL) { if (pLocalReducer->pCtx != NULL) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
@ -503,8 +526,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tfree(pLocalReducer->pLoserTree); tfree(pLocalReducer->pLoserTree);
} }
tfree(pLocalReducer->pBufForInterpo);
tfree(pLocalReducer->pFinalRes); tfree(pLocalReducer->pFinalRes);
tfree(pLocalReducer->discardData); tfree(pLocalReducer->discardData);
@ -740,7 +761,7 @@ int32_t loadNewDataFromDiskFor(SLocalReducer *pLocalReducer, SLocalDataSource *p
#if defined(_DEBUG_VIEW) #if defined(_DEBUG_VIEW)
printf("new page load to buffer\n"); printf("new page load to buffer\n");
tColModelDisplay(pOneInterDataSrc->pMemBuffer->pColumnModel, pOneInterDataSrc->filePage.data, tColModelDisplay(pOneInterDataSrc->pMemBuffer->pColumnModel, pOneInterDataSrc->filePage.data,
pOneInterDataSrc->filePage.numOfElems, pOneInterDataSrc->pMemBuffer->pColumnModel->capacity); pOneInterDataSrc->filePage.num, pOneInterDataSrc->pMemBuffer->pColumnModel->capacity);
#endif #endif
*needAdjustLoserTree = true; *needAdjustLoserTree = true;
} else { } else {
@ -761,7 +782,7 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
* since it's last record in buffer has been chosen to be processed, as the winner of loser-tree * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
*/ */
bool needToAdjust = true; bool needToAdjust = true;
if (pOneInterDataSrc->filePage.numOfElems <= pOneInterDataSrc->rowIdx) { if (pOneInterDataSrc->filePage.num <= pOneInterDataSrc->rowIdx) {
loadNewDataFromDiskFor(pLocalReducer, pOneInterDataSrc, &needToAdjust); loadNewDataFromDiskFor(pLocalReducer, pOneInterDataSrc, &needToAdjust);
} }
@ -788,22 +809,20 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
} }
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo,
SInterpolationInfo *pInterpoInfo) { SFillInfo *pFillInfo) {
// discard following dataset in the same group and reset the interpolation information // discard following dataset in the same group and reset the interpolation information
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int16_t prec = tinfo.precision; int16_t prec = tinfo.precision;
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t revisedSTime = int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec); taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
taosResetFillInfo(pFillInfo, revisedSTime);
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
pLocalReducer->rowSize);
pLocalReducer->discard = true; pLocalReducer->discard = true;
pLocalReducer->discardData->numOfElems = 0; pLocalReducer->discardData->num = 0;
SColumnModel *pModel = pLocalReducer->pDesc->pColumnModel; SColumnModel *pModel = pLocalReducer->pDesc->pColumnModel;
tColModelAppend(pModel, pLocalReducer->discardData, pLocalReducer->prevRowOfInput, 0, 1, 1); tColModelAppend(pModel, pLocalReducer->discardData, pLocalReducer->prevRowOfInput, 0, 1, 1);
@ -856,6 +875,7 @@ static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRe
static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) { static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
SSqlCmd * pCmd = &pSql->cmd; SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res; SSqlRes * pRes = &pSql->res;
tFilePage * pFinalDataPage = pLocalReducer->pResultBuf; tFilePage * pFinalDataPage = pLocalReducer->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
@ -868,15 +888,15 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
assert(pRes->pLocalReducer == NULL); assert(pRes->pLocalReducer == NULL);
} }
if (pQueryInfo->intervalTime == 0 || pQueryInfo->interpoType == TSDB_INTERPO_NONE) { if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
// no interval query, no interpolation // no interval query, no fill operation
pRes->data = pLocalReducer->pFinalRes; pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = pFinalDataPage->numOfElems; pRes->numOfRows = pFinalDataPage->num;
pRes->numOfTotalInCurrentClause += pRes->numOfRows; pRes->numOfTotalInCurrentClause += pRes->numOfRows;
if (pQueryInfo->limit.offset > 0) { if (pQueryInfo->limit.offset > 0) {
if (pQueryInfo->limit.offset < pRes->numOfRows) { if (pQueryInfo->limit.offset < pRes->numOfRows) {
int32_t prevSize = pFinalDataPage->numOfElems; int32_t prevSize = pFinalDataPage->num;
tColModelErase(pLocalReducer->resColModel, pFinalDataPage, prevSize, 0, pQueryInfo->limit.offset - 1); tColModelErase(pLocalReducer->resColModel, pFinalDataPage, prevSize, 0, pQueryInfo->limit.offset - 1);
/* remove the hole in column model */ /* remove the hole in column model */
@ -894,65 +914,40 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
if (pQueryInfo->limit.limit >= 0 && pRes->numOfTotalInCurrentClause > pQueryInfo->limit.limit) { if (pQueryInfo->limit.limit >= 0 && pRes->numOfTotalInCurrentClause > pQueryInfo->limit.limit) {
/* impose the limitation of output rows on the final result */ /* impose the limitation of output rows on the final result */
int32_t prevSize = pFinalDataPage->numOfElems; int32_t prevSize = pFinalDataPage->num;
int32_t overFlow = pRes->numOfTotalInCurrentClause - pQueryInfo->limit.limit; int32_t overFlow = pRes->numOfTotalInCurrentClause - pQueryInfo->limit.limit;
assert(overFlow < pRes->numOfRows); assert(overFlow < pRes->numOfRows);
pRes->numOfTotalInCurrentClause = pQueryInfo->limit.limit; pRes->numOfTotalInCurrentClause = pQueryInfo->limit.limit;
pRes->numOfRows -= overFlow; pRes->numOfRows -= overFlow;
pFinalDataPage->numOfElems -= overFlow; pFinalDataPage->num -= overFlow;
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize); tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
/* set remain data to be discarded, and reset the interpolation information */ /* set remain data to be discarded, and reset the interpolation information */
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, &pLocalReducer->interpolationInfo); savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
} }
int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList); int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList);
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize); memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize);
pFinalDataPage->numOfElems = 0; pFinalDataPage->num = 0;
return; return;
} }
int64_t *pPrimaryKeys = (int64_t *)pLocalReducer->pBufForInterpo; SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
int64_t actualETime = MAX(pQueryInfo->window.skey, pQueryInfo->window.ekey);
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo;
int64_t actualETime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput); tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalReducer->resColModel->capacity); pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalReducer->resColModel->capacity);
} }
char ** srcData = (char **)malloc((POINTER_BYTES + sizeof(int32_t)) * pQueryInfo->fieldsInfo.numOfOutput);
int32_t *functions = (int32_t *)((char *)srcData + pQueryInfo->fieldsInfo.numOfOutput * sizeof(void *));
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
srcData[i] =
pLocalReducer->pBufForInterpo + tscFieldInfoGetOffset(pQueryInfo, i) * pInterpoInfo->numOfRawDataInRows;
functions[i] = tscSqlExprGet(pQueryInfo, i)->functionId;
}
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int8_t precision = tinfo.precision;
while (1) { while (1) {
int32_t remains = taosNumOfRemainPoints(pInterpoInfo); int64_t newRows = -1;
TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime, taosGenerateDataBlock(pFillInfo, pResPages, &newRows, pLocalReducer->resColModel->capacity);
pQueryInfo->slidingTimeUnit, precision);
int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pQueryInfo->intervalTime, etime,
pLocalReducer->resColModel->capacity);
int32_t newRows = taosDoInterpoResult(pInterpoInfo, pQueryInfo->interpoType, pResPages, remains, nrows,
pQueryInfo->intervalTime, pPrimaryKeys, pLocalReducer->resColModel, srcData,
pQueryInfo->defaultVal, functions, pLocalReducer->resColModel->capacity);
assert(newRows <= nrows);
if (pQueryInfo->limit.offset < newRows) { if (pQueryInfo->limit.offset < newRows) {
newRows -= pQueryInfo->limit.offset; newRows -= pQueryInfo->limit.offset;
@ -975,16 +970,15 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
pQueryInfo->limit.offset -= newRows; pQueryInfo->limit.offset -= newRows;
pRes->numOfRows = 0; pRes->numOfRows = 0;
int32_t rpoints = taosNumOfRemainPoints(pInterpoInfo); int32_t rpoints = taosNumOfRemainRows(pFillInfo);
if (rpoints <= 0) { if (rpoints <= 0) {
if (!doneOutput) { if (!doneOutput) { // reduce procedure has not completed yet, but current results for fill are exhausted
/* reduce procedure is not completed, but current results for interpolation are exhausted */
break; break;
} }
/* all output for current group are completed */ /* all output for current group are completed */
int32_t totalRemainRows = int32_t totalRemainRows =
taosGetNumOfResWithoutLimit(pInterpoInfo, pPrimaryKeys, rpoints, pQueryInfo->intervalTime, actualETime); taosGetNumOfResultWithFill(pFillInfo, rpoints, pFillInfo->slidingTime, actualETime);
if (totalRemainRows <= 0) { if (totalRemainRows <= 0) {
break; break;
} }
@ -1000,10 +994,10 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
assert(pRes->numOfRows >= 0); assert(pRes->numOfRows >= 0);
pRes->numOfTotalInCurrentClause = pQueryInfo->limit.limit; pRes->numOfTotalInCurrentClause = pQueryInfo->limit.limit;
pFinalDataPage->numOfElems -= overFlow; pFinalDataPage->num -= overFlow;
/* set remain data to be discarded, and reset the interpolation information */ /* set remain data to be discarded, and reset the interpolation information */
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pInterpoInfo); savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo);
} }
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
@ -1017,18 +1011,17 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
} }
} }
pFinalDataPage->numOfElems = 0; pFinalDataPage->num = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
tfree(pResPages[i]); tfree(pResPages[i]);
} }
tfree(pResPages); tfree(pResPages);
free(srcData);
} }
static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) { static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
SColumnModel *pColumnModel = pLocalReducer->pDesc->pColumnModel; SColumnModel *pColumnModel = pLocalReducer->pDesc->pColumnModel;
assert(pColumnModel->capacity == 1 && tmpBuffer->numOfElems == 1); assert(pColumnModel->capacity == 1 && tmpBuffer->num == 1);
// copy to previous temp buffer // copy to previous temp buffer
for (int32_t i = 0; i < pColumnModel->numOfCols; ++i) { for (int32_t i = 0; i < pColumnModel->numOfCols; ++i) {
@ -1038,7 +1031,7 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer)
memcpy(pLocalReducer->prevRowOfInput + offset, tmpBuffer->data + offset, pSchema->bytes); memcpy(pLocalReducer->prevRowOfInput + offset, tmpBuffer->data + offset, pSchema->bytes);
} }
tmpBuffer->numOfElems = 0; tmpBuffer->num = 0;
pLocalReducer->hasPrevRow = true; pLocalReducer->hasPrevRow = true;
} }
@ -1168,7 +1161,7 @@ int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {
pLocalReducer->hasPrevRow = false; pLocalReducer->hasPrevRow = false;
int32_t numOfRes = (int32_t)getNumOfResultLocal(pQueryInfo, pLocalReducer->pCtx); int32_t numOfRes = (int32_t)getNumOfResultLocal(pQueryInfo, pLocalReducer->pCtx);
pLocalReducer->pResultBuf->numOfElems += numOfRes; pLocalReducer->pResultBuf->num += numOfRes;
fillMultiRowsOfTagsVal(pQueryInfo, numOfRes, pLocalReducer); fillMultiRowsOfTagsVal(pQueryInfo, numOfRes, pLocalReducer);
return numOfRes; return numOfRes;
@ -1244,37 +1237,38 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
/* /*
* ignore the output of the current group since this group is skipped by user * Ignore the output of the current group since this group is skipped by user
* We set the numOfRows to be 0 and discard the possible remain results. * We set the numOfRows to be 0 and discard the possible remain results.
*/ */
if (pQueryInfo->slimit.offset > 0) { if (pQueryInfo->slimit.offset > 0) {
pRes->numOfRows = 0; pRes->numOfRows = 0;
pQueryInfo->slimit.offset -= 1; pQueryInfo->slimit.offset -= 1;
pLocalReducer->discard = !noMoreCurrentGroupRes; pLocalReducer->discard = !noMoreCurrentGroupRes;
return false; return false;
} }
tColModelCompact(pModel, pResBuf, pModel->capacity); tColModelCompact(pModel, pResBuf, pModel->capacity);
memcpy(pLocalReducer->pBufForInterpo, pResBuf->data, pLocalReducer->nResultBufSize); // memcpy(pLocalReducer->pBufForInterpo, pResBuf->data, pLocalReducer->nResultBufSize);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
printf("final result before interpo:\n"); printf("final result before interpo:\n");
tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->numOfElems, pResBuf->numOfElems); assert(0);
// tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num);
#endif #endif
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo; SFillInfo* pFillInfo = pLocalReducer->pFillInfo;
int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int16_t offset = getColumnModelOffset(pModel, startIndex + i);
SSchema *pSchema = getColumnModelSchema(pModel, startIndex + i); TSKEY ekey = taosGetRevisedEndKey(pQueryInfo->window.ekey, pFillInfo->order, pFillInfo->slidingTime,
pQueryInfo->slidingTimeUnit, tinfo.precision);
memcpy(pInterpoInfo->pTags[i], pLocalReducer->pBufForInterpo + offset * pResBuf->numOfElems, pSchema->bytes);
} taosFillSetStartInfo(pFillInfo, pResBuf->num, ekey);
taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
taosInterpoSetStartInfo(&pLocalReducer->interpolationInfo, pResBuf->numOfElems, pQueryInfo->interpoType);
doInterpolateResult(pSql, pLocalReducer, noMoreCurrentGroupRes); doInterpolateResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
return true; return true;
} }
@ -1302,13 +1296,13 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
int8_t precision = tinfo.precision; int8_t precision = tinfo.precision;
// for group result interpolation, do not return if not data is generated // for group result interpolation, do not return if not data is generated
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; TSKEY skey = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
int64_t newTime = int64_t newTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision); taosGetIntervalStartTimestamp(skey, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
// taosResetFillInfo(pLocalReducer->pFillInfo, pQueryInfo->order.order, newTime,
taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime, // pQueryInfo->groupbyExpr.numOfGroupCols, 4096, 0, NULL, pLocalReducer->rowSize);
pQueryInfo->groupbyExpr.numOfGroupCols, pLocalReducer->rowSize); taosResetFillInfo(pLocalReducer->pFillInfo, newTime);
} }
} }
@ -1320,26 +1314,26 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SLocalReducer * pLocalReducer = pRes->pLocalReducer; SLocalReducer *pLocalReducer = pRes->pLocalReducer;
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo; SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int8_t p = tinfo.precision; int8_t p = tinfo.precision;
if (taosHasRemainsDataForInterpolation(pInterpoInfo)) { if (pFillInfo != NULL && taosNumOfRemainRows(pFillInfo) > 0) {
assert(pQueryInfo->interpoType != TSDB_INTERPO_NONE); assert(pQueryInfo->fillType != TSDB_FILL_NONE);
tFilePage *pFinalDataBuf = pLocalReducer->pResultBuf; tFilePage *pFinalDataBuf = pLocalReducer->pResultBuf;
int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pInterpoInfo->numOfRawDataInRows - 1)); int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1));
int32_t remain = taosNumOfRemainPoints(pInterpoInfo); int32_t remain = taosNumOfRemainRows(pFillInfo);
TSKEY ekey = TSKEY ekey = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, p);
taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, p);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain, // the first column must be the timestamp column
pQueryInfo->intervalTime, ekey, pLocalReducer->resColModel->capacity); int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, false); doInterpolateResult(pSql, pLocalReducer, false);
} }
@ -1355,7 +1349,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SLocalReducer * pLocalReducer = pRes->pLocalReducer; SLocalReducer * pLocalReducer = pRes->pLocalReducer;
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo; SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow; bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow;
@ -1363,18 +1357,16 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int8_t precision = tinfo.precision;
if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL || if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL ||
prevGroupCompleted) { prevGroupCompleted) {
// if interpoType == TSDB_INTERPO_NONE, return directly // if fillType == TSDB_FILL_NONE, return directly
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey; int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->slidingTimeUnit, precision); pQueryInfo->slidingTimeUnit, tinfo.precision);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pQueryInfo->intervalTime, etime, int32_t rows = taosGetNumOfResultWithFill(pFillInfo, 0, etime, pLocalReducer->resColModel->capacity);
pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo if (rows > 0) { // do interpo
doInterpolateResult(pSql, pLocalReducer, true); doInterpolateResult(pSql, pLocalReducer, true);
} }
@ -1474,7 +1466,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index); printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index);
#endif #endif
assert((pTree->pNode[0].index < pLocalReducer->numOfBuffer) && (pTree->pNode[0].index >= 0) && assert((pTree->pNode[0].index < pLocalReducer->numOfBuffer) && (pTree->pNode[0].index >= 0) &&
tmpBuffer->numOfElems == 0); tmpBuffer->num == 0);
// chosen from loser tree // chosen from loser tree
SLocalDataSource *pOneDataSrc = pLocalReducer->pLocalDataSrc[pTree->pNode[0].index]; SLocalDataSource *pOneDataSrc = pLocalReducer->pLocalDataSrc[pTree->pNode[0].index];
@ -1487,7 +1479,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
SSrcColumnInfo colInfo[256] = {0}; SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo); tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pModel, tmpBuffer->data, tmpBuffer->numOfElems, pModel->capacity, colInfo); tColModelDisplayEx(pModel, tmpBuffer->data, tmpBuffer->num, pModel->capacity, colInfo);
#endif #endif
if (pLocalReducer->discard) { if (pLocalReducer->discard) {
@ -1495,7 +1487,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
/* current record belongs to the same group of previous record, need to discard it */ /* current record belongs to the same group of previous record, need to discard it */
if (isSameGroup(pCmd, pLocalReducer, pLocalReducer->discardData->data, tmpBuffer)) { if (isSameGroup(pCmd, pLocalReducer, pLocalReducer->discardData->data, tmpBuffer)) {
tmpBuffer->numOfElems = 0; tmpBuffer->num = 0;
pOneDataSrc->rowIdx += 1; pOneDataSrc->rowIdx += 1;
adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree); adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree);
@ -1509,7 +1501,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
continue; continue;
} else { } else {
pLocalReducer->discard = false; pLocalReducer->discard = false;
pLocalReducer->discardData->numOfElems = 0; pLocalReducer->discardData->num = 0;
if (saveGroupResultInfo(pSql)) { if (saveGroupResultInfo(pSql)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY; pLocalReducer->status = TSC_LOCALREDUCE_READY;
@ -1538,17 +1530,17 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
tFilePage *pResBuf = pLocalReducer->pResultBuf; tFilePage *pResBuf = pLocalReducer->pResultBuf;
/* /*
* if the previous group does NOT generate any result (pResBuf->numOfElems == 0), * if the previous group does NOT generate any result (pResBuf->num == 0),
* continue to process results instead of return results. * continue to process results instead of return results.
*/ */
if ((!sameGroup && pResBuf->numOfElems > 0) || (pResBuf->numOfElems == pLocalReducer->resColModel->capacity)) { if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalReducer->resColModel->capacity)) {
// does not belong to the same group // does not belong to the same group
bool notSkipped = doGenerateFinalResults(pSql, pLocalReducer, !sameGroup); bool notSkipped = doGenerateFinalResults(pSql, pLocalReducer, !sameGroup);
// this row needs to discard, since it belongs to the group of previous // this row needs to discard, since it belongs to the group of previous
if (pLocalReducer->discard && sameGroup) { if (pLocalReducer->discard && sameGroup) {
pLocalReducer->hasUnprocessedRow = false; pLocalReducer->hasUnprocessedRow = false;
tmpBuffer->numOfElems = 0; tmpBuffer->num = 0;
} else { } else {
// current row does not belongs to the previous group, so it is not be handled yet. // current row does not belongs to the previous group, so it is not be handled yet.
pLocalReducer->hasUnprocessedRow = true; pLocalReducer->hasUnprocessedRow = true;
@ -1611,7 +1603,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
finalizeRes(pQueryInfo, pLocalReducer); finalizeRes(pQueryInfo, pLocalReducer);
} }
if (pLocalReducer->pResultBuf->numOfElems) { if (pLocalReducer->pResultBuf->num) {
doGenerateFinalResults(pSql, pLocalReducer, true); doGenerateFinalResults(pSql, pLocalReducer, true);
} }
@ -1641,6 +1633,6 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
size_t allocSize = numOfRes * rowLen + sizeof(tFilePage) + 1; size_t allocSize = numOfRes * rowLen + sizeof(tFilePage) + 1;
pRes->pLocalReducer->pResultBuf = (tFilePage *)calloc(1, allocSize); pRes->pLocalReducer->pResultBuf = (tFilePage *)calloc(1, allocSize);
pRes->pLocalReducer->pResultBuf->numOfElems = numOfRes; pRes->pLocalReducer->pResultBuf->num = numOfRes;
pRes->data = pRes->pLocalReducer->pResultBuf->data; pRes->data = pRes->pLocalReducer->pResultBuf->data;
} }

View File

@ -672,7 +672,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->order = htons(pQueryInfo->order.order); pQueryMsg->order = htons(pQueryInfo->order.order);
pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId); pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId);
pQueryMsg->interpoType = htons(pQueryInfo->interpoType); pQueryMsg->fillType = htons(pQueryInfo->fillType);
pQueryMsg->limit = htobe64(pQueryInfo->limit.limit); pQueryMsg->limit = htobe64(pQueryInfo->limit.limit);
pQueryMsg->offset = htobe64(pQueryInfo->limit.offset); pQueryMsg->offset = htobe64(pQueryInfo->limit.offset);
pQueryMsg->numOfCols = htons(taosArrayGetSize(pQueryInfo->colList)); pQueryMsg->numOfCols = htons(taosArrayGetSize(pQueryInfo->colList));
@ -800,7 +800,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
} }
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
*((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]); *((int64_t *)pMsg) = htobe64(pQueryInfo->defaultVal[i]);
pMsg += sizeof(pQueryInfo->defaultVal[0]); pMsg += sizeof(pQueryInfo->defaultVal[0]);

View File

@ -208,7 +208,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pStream->numOfRes == 0) { if (pStream->numOfRes == 0) {
if (pQueryInfo->interpoType == TSDB_INTERPO_SET_VALUE || pQueryInfo->interpoType == TSDB_INTERPO_NULL) { if (pQueryInfo->fillType == TSDB_FILL_SET_VALUE || pQueryInfo->fillType == TSDB_FILL_NULL) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
/* failed to retrieve any result in this retrieve */ /* failed to retrieve any result in this retrieve */

View File

@ -1390,7 +1390,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]); tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
// clear local saved number of results // clear local saved number of results
trsupport->localBuffer->numOfElems = 0; trsupport->localBuffer->num = 0;
pthread_mutex_unlock(&trsupport->queryMutex); pthread_mutex_unlock(&trsupport->queryMutex);
tscTrace("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, tscTrace("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql,
@ -1457,7 +1457,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
// data in from current vnode is stored in cache and disk // data in from current vnode is stored in cache and disk
uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->numOfElems; uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num;
tscTrace("%p sub:%p all data retrieved from ip:%u,vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, tscTrace("%p sub:%p all data retrieved from ip:%u,vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql,
pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId,
numOfRowsFromSubquery, idx); numOfRowsFromSubquery, idx);
@ -1465,11 +1465,11 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->numOfElems); printf("%" PRIu64 " rows data flushed to disk:\n", trsupport->localBuffer->num);
SSrcColumnInfo colInfo[256] = {0}; SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo); tscGetSrcColumnInfo(colInfo, pQueryInfo);
tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->numOfElems, tColModelDisplayEx(pDesc->pColumnModel, trsupport->localBuffer->data, trsupport->localBuffer->num,
trsupport->localBuffer->numOfElems, colInfo); trsupport->localBuffer->num, colInfo);
#endif #endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) {

View File

@ -280,7 +280,7 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo) {
return; return;
} }
pQueryInfo->interpoType = TSDB_INTERPO_NONE; pQueryInfo->fillType = TSDB_FILL_NONE;
tfree(pQueryInfo->defaultVal); tfree(pQueryInfo->defaultVal);
} }
@ -1779,7 +1779,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond); tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
} }

View File

@ -140,19 +140,19 @@ enum _mgmt_table {
TSDB_MGMT_TABLE_MAX, TSDB_MGMT_TABLE_MAX,
}; };
#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1 #define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1
#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2 #define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2
#define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3 #define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3
#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 #define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4
#define TSDB_ALTER_TABLE_ADD_COLUMN 5 #define TSDB_ALTER_TABLE_ADD_COLUMN 5
#define TSDB_ALTER_TABLE_DROP_COLUMN 6 #define TSDB_ALTER_TABLE_DROP_COLUMN 6
#define TSDB_INTERPO_NONE 0 #define TSDB_FILL_NONE 0
#define TSDB_INTERPO_NULL 1 #define TSDB_FILL_NULL 1
#define TSDB_INTERPO_SET_VALUE 2 #define TSDB_FILL_SET_VALUE 2
#define TSDB_INTERPO_LINEAR 3 #define TSDB_FILL_LINEAR 3
#define TSDB_INTERPO_PREV 4 #define TSDB_FILL_PREV 4
#define TSDB_ALTER_USER_PASSWD 0x1 #define TSDB_ALTER_USER_PASSWD 0x1
#define TSDB_ALTER_USER_PRIVILEGES 0x2 #define TSDB_ALTER_USER_PRIVILEGES 0x2
@ -164,8 +164,8 @@ enum _mgmt_table {
#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) #define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS)
#define TSDB_COL_NORMAL 0x0u #define TSDB_COL_NORMAL 0x0u
#define TSDB_COL_TAG 0x1u #define TSDB_COL_TAG 0x1u
#define TSDB_COL_JOIN 0x2u #define TSDB_COL_JOIN 0x2u
extern char *taosMsg[]; extern char *taosMsg[];
@ -439,7 +439,7 @@ typedef struct {
uint16_t queryType; // denote another query process uint16_t queryType; // denote another query process
int16_t numOfOutput; // final output columns numbers int16_t numOfOutput; // final output columns numbers
int16_t tagNameRelType; // relation of tag criteria and tbname criteria int16_t tagNameRelType; // relation of tag criteria and tbname criteria
int16_t interpoType; // interpolate type int16_t fillType; // interpolate type
uint64_t defaultVal; // default value array list uint64_t defaultVal; // default value array list
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block int32_t tsLen; // total length of ts comp block

View File

@ -68,7 +68,7 @@ typedef struct SExtFileInfo {
} SExtFileInfo; } SExtFileInfo;
typedef struct tFilePage { typedef struct tFilePage {
uint64_t numOfElems; uint64_t num;
char data[]; char data[];
} tFilePage; } tFilePage;

View File

@ -24,18 +24,34 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
#include "qextbuffer.h" #include "qextbuffer.h"
typedef struct SInterpolationInfo { typedef struct {
int64_t startTimestamp; STColumn col; // column info
int32_t order; // order [asc/desc] int16_t functionId; // sql function id
int32_t numOfRawDataInRows; // number of points in pQuery->sdata int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
int32_t rowIdx; // rowIdx in pQuery->sdata union {int64_t i; double d;} defaultVal;
int32_t numOfTotalInterpo; // number of interpolated rows in one round } SFillColInfo;
int32_t numOfCurrentInterpo; // number of interpolated rows in current results
char * prevValues; // previous row of data typedef struct SFillInfo {
TSKEY start; // start timestamp
TSKEY endKey; // endKey for fill
int32_t order; // order [TSDB_ORDER_ASC|TSDB_ORDER_DESC]
int32_t fillType; // fill type
int32_t numOfRows; // number of rows in the input data block
int32_t rowIdx; // rowIdx
int32_t numOfTotal; // number of filled rows in one round
int32_t numOfCurrent; // number of filled rows in current results
int32_t numOfTags; // number of tags
int32_t numOfCols; // number of columns, including the tags columns
int32_t rowSize; // size of each row
char ** pTags; // tags value for current interpolation
int64_t slidingTime; // sliding value to determine the number of result for a given time window
char * prevValues; // previous row of data, to generate the interpolation results
char * nextValues; // next row of data char * nextValues; // next row of data
int32_t numOfTags; SFillColInfo* pFillCol; // column info for fill operations
char ** pTags; // tags value for current interoplation char** pData; // original result data block involved in filling data
} SInterpolationInfo; } SFillInfo;
typedef struct SPoint { typedef struct SPoint {
int64_t key; int64_t key;
@ -44,49 +60,31 @@ typedef struct SPoint {
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision); int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision);
void taosInitInterpoInfo(SInterpolationInfo *pInterpoInfo, int32_t order, int64_t startTimeStamp, int32_t numOfTags, SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity,
int32_t rowSize); int32_t numOfCols, int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol);
void taosDestoryInterpoInfo(SInterpolationInfo *pInterpoInfo); void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp);
void taosInterpoSetStartInfo(SInterpolationInfo *pInterpoInfo, int32_t numOfRawDataInRows, int32_t type); void taosDestoryFillInfo(SFillInfo *pFillInfo);
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit, int8_t precision); void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
/** void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput);
*
* @param pInterpoInfo
* @param pPrimaryKeyArray
* @param numOfRows
* @param nInterval
* @param ekey
* @param maxNumOfRows
* @return
*/
int32_t taosGetNumOfResultWithInterpo(SInterpolationInfo *pInterpoInfo, int64_t *pPrimaryKeyArray, int32_t numOfRows,
int64_t nInterval, int64_t ekey, int32_t maxNumOfRows);
int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo *pInterpoInfo, int64_t *pPrimaryKeyArray, void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput);
int32_t numOfRawDataInRows, int64_t nInterval, int64_t ekey);
/**
*
* @param pInterpoInfo
* @return
*/
bool taosHasRemainsDataForInterpolation(SInterpolationInfo *pInterpoInfo);
int32_t taosNumOfRemainPoints(SInterpolationInfo *pInterpoInfo); TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision);
/** int32_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows);
*
*/
int32_t taosDoInterpoResult(SInterpolationInfo *pInterpoInfo, int16_t interpoType, tFilePage **data,
int32_t numOfRawDataInRows, int32_t outputRows, int64_t nInterval,
const int64_t *pPrimaryKeyArray, SColumnModel *pModel, char **srcData, int64_t *defaultVal,
const int32_t *functionIDs, int32_t bufSize);
int32_t taosNumOfRemainRows(SFillInfo *pFillInfo);
int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfRows, int32_t outputRows, char** srcData);
int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point); int taosDoLinearInterpolation(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
void taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int64_t* outputRows, int32_t capacity);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -28,10 +28,10 @@
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "tarray.h" #include "tarray.h"
typedef struct SData { //typedef struct tFilePage {
int32_t num; // int64_t num;
char data[]; // char data[];
} SData; //} tFilePage;
struct SColumnFilterElem; struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2); typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
@ -129,7 +129,7 @@ typedef struct SQuery {
char slidingTimeUnit; // interval data type, used for daytime revise char slidingTimeUnit; // interval data type, used for daytime revise
int8_t precision; int8_t precision;
int16_t numOfOutput; int16_t numOfOutput;
int16_t interpoType; int16_t fillType;
int16_t checkBuffer; // check if the buffer is full during scan each block int16_t checkBuffer; // check if the buffer is full during scan each block
SLimitVal limit; SLimitVal limit;
int32_t rowSize; int32_t rowSize;
@ -139,11 +139,10 @@ typedef struct SQuery {
SColumnInfo* tagColList; SColumnInfo* tagColList;
int32_t numOfFilterCols; int32_t numOfFilterCols;
int64_t* defaultVal; int64_t* defaultVal;
// TSKEY lastKey;
uint32_t status; // query status uint32_t status; // query status
SResultRec rec; SResultRec rec;
int32_t pos; int32_t pos;
SData** sdata; tFilePage** sdata;
STableQueryInfo* current; STableQueryInfo* current;
SSingleColumnFilterInfo* pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
} SQuery; } SQuery;
@ -151,12 +150,11 @@ typedef struct SQuery {
typedef struct SQueryRuntimeEnv { typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery; SQuery* pQuery;
SData** pInterpoBuf;
SQLFunctionCtx* pCtx; SQLFunctionCtx* pCtx;
int16_t numOfRowsPerPage; int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
uint16_t scanFlag; // denotes reversed scan of data or not uint16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo; SFillInfo* pFillInfo;
SWindowResInfo windowResInfo; SWindowResInfo windowResInfo;
STSBuf* pTSBuf; STSBuf* pTSBuf;
STSCursor cur; STSCursor cur;

View File

@ -136,7 +136,7 @@ static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) {
} }
item->pNext = NULL; item->pNext = NULL;
item->item.numOfElems = 0; item->item.num = 0;
if (pMemBuffer->pTail != NULL) { if (pMemBuffer->pTail != NULL) {
pMemBuffer->pTail->pNext = item; pMemBuffer->pTail->pNext = item;
@ -167,13 +167,13 @@ int16_t tExtMemBufferPut(tExtMemBuffer *pMemBuffer, void *data, int32_t numOfRow
pLast = pMemBuffer->pTail; pLast = pMemBuffer->pTail;
} }
if (pLast->item.numOfElems + numOfRows <= pMemBuffer->numOfElemsPerPage) { // enough space for records if (pLast->item.num + numOfRows <= pMemBuffer->numOfElemsPerPage) { // enough space for records
tColModelAppend(pMemBuffer->pColumnModel, &pLast->item, data, 0, numOfRows, numOfRows); tColModelAppend(pMemBuffer->pColumnModel, &pLast->item, data, 0, numOfRows, numOfRows);
pMemBuffer->numOfElemsInBuffer += numOfRows; pMemBuffer->numOfElemsInBuffer += numOfRows;
pMemBuffer->numOfTotalElems += numOfRows; pMemBuffer->numOfTotalElems += numOfRows;
} else { } else {
int32_t numOfRemainEntries = pMemBuffer->numOfElemsPerPage - pLast->item.numOfElems; int32_t numOfRemainEntries = pMemBuffer->numOfElemsPerPage - pLast->item.num;
tColModelAppend(pMemBuffer->pColumnModel, &pLast->item, data, 0, numOfRemainEntries, numOfRows); tColModelAppend(pMemBuffer->pColumnModel, &pLast->item, data, 0, numOfRemainEntries, numOfRows);
pMemBuffer->numOfElemsInBuffer += numOfRemainEntries; pMemBuffer->numOfElemsInBuffer += numOfRemainEntries;
@ -271,7 +271,7 @@ bool tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) {
ret = false; ret = false;
} }
pMemBuffer->fileMeta.numOfElemsInFile += first->item.numOfElems; pMemBuffer->fileMeta.numOfElemsInFile += first->item.num;
pMemBuffer->fileMeta.nFileSize += 1; pMemBuffer->fileMeta.nFileSize += 1;
tFilePagesItem *ptmp = first; tFilePagesItem *ptmp = first;
@ -985,16 +985,16 @@ void tColModelDisplayEx(SColumnModel *pModel, void *pData, int32_t numOfRows, in
//////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////
void tColModelCompact(SColumnModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity) { void tColModelCompact(SColumnModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity) {
if (inputBuffer->numOfElems == 0 || maxElemsCapacity == inputBuffer->numOfElems) { if (inputBuffer->num == 0 || maxElemsCapacity == inputBuffer->num) {
return; return;
} }
/* start from the second column */ /* start from the second column */
for (int32_t i = 1; i < pModel->numOfCols; ++i) { for (int32_t i = 1; i < pModel->numOfCols; ++i) {
SSchemaEx* pSchemaEx = &pModel->pFields[i]; SSchemaEx* pSchemaEx = &pModel->pFields[i];
memmove(inputBuffer->data + pSchemaEx->offset * inputBuffer->numOfElems, memmove(inputBuffer->data + pSchemaEx->offset * inputBuffer->num,
inputBuffer->data + pSchemaEx->offset * maxElemsCapacity, inputBuffer->data + pSchemaEx->offset * maxElemsCapacity,
pSchemaEx->field.bytes * inputBuffer->numOfElems); pSchemaEx->field.bytes * inputBuffer->num);
} }
} }
@ -1009,13 +1009,13 @@ int16_t getColumnModelOffset(SColumnModel *pColumnModel, int32_t index) {
} }
void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t blockCapacity, int32_t s, int32_t e) { void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t blockCapacity, int32_t s, int32_t e) {
if (inputBuffer->numOfElems == 0 || (e - s + 1) <= 0) { if (inputBuffer->num == 0 || (e - s + 1) <= 0) {
return; return;
} }
int32_t removed = e - s + 1; int32_t removed = e - s + 1;
int32_t remain = inputBuffer->numOfElems - removed; int32_t remain = inputBuffer->num - removed;
int32_t secPart = inputBuffer->numOfElems - e - 1; int32_t secPart = inputBuffer->num - e - 1;
/* start from the second column */ /* start from the second column */
for (int32_t i = 0; i < pModel->numOfCols; ++i) { for (int32_t i = 0; i < pModel->numOfCols; ++i) {
@ -1028,7 +1028,7 @@ void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t blockC
memmove(startPos, endPos, pSchema->bytes * secPart); memmove(startPos, endPos, pSchema->bytes * secPart);
} }
inputBuffer->numOfElems = remain; inputBuffer->num = remain;
} }
/* /*
@ -1040,16 +1040,16 @@ void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t blockC
*/ */
void tColModelAppend(SColumnModel *dstModel, tFilePage *dstPage, void *srcData, int32_t start, int32_t numOfRows, void tColModelAppend(SColumnModel *dstModel, tFilePage *dstPage, void *srcData, int32_t start, int32_t numOfRows,
int32_t srcCapacity) { int32_t srcCapacity) {
assert(dstPage->numOfElems + numOfRows <= dstModel->capacity); assert(dstPage->num + numOfRows <= dstModel->capacity);
for (int32_t col = 0; col < dstModel->numOfCols; ++col) { for (int32_t col = 0; col < dstModel->numOfCols; ++col) {
char *dst = COLMODEL_GET_VAL(dstPage->data, dstModel, dstModel->capacity, dstPage->numOfElems, col); char *dst = COLMODEL_GET_VAL(dstPage->data, dstModel, dstModel->capacity, dstPage->num, col);
char *src = COLMODEL_GET_VAL((char *)srcData, dstModel, srcCapacity, start, col); char *src = COLMODEL_GET_VAL((char *)srcData, dstModel, srcCapacity, start, col);
memmove(dst, src, dstModel->pFields[col].field.bytes * numOfRows); memmove(dst, src, dstModel->pFields[col].field.bytes * numOfRows);
} }
dstPage->numOfElems += numOfRows; dstPage->num += numOfRows;
} }
tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrderCols, SColumnModel *pModel, tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrderCols, SColumnModel *pModel,

View File

@ -20,7 +20,7 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
#define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSDB_ORDER_ASC) #define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision) { int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char intervalTimeUnit, int16_t precision) {
if (timeRange == 0) { if (timeRange == 0) {
@ -37,6 +37,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char
* TODO dynamically decide the start time of a day * TODO dynamically decide the start time of a day
*/ */
// todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900 #if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019 // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t timezone = _timezone; int64_t timezone = _timezone;
@ -56,130 +57,169 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t timeRange, char
} }
} }
void taosInitInterpoInfo(SInterpolationInfo* pInterpoInfo, int32_t order, int64_t startTimestamp, SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int32_t numOfGroupbyTags, int32_t rowSize) { int64_t slidingTime, int32_t fillType, SFillColInfo* pFillCol) {
pInterpoInfo->startTimestamp = startTimestamp; if (fillType == TSDB_FILL_NONE) {
pInterpoInfo->rowIdx = -1; return NULL;
pInterpoInfo->numOfRawDataInRows = 0;
pInterpoInfo->numOfCurrentInterpo = 0;
pInterpoInfo->numOfTotalInterpo = 0;
pInterpoInfo->order = order;
pInterpoInfo->numOfTags = numOfGroupbyTags;
if (pInterpoInfo->pTags == NULL && numOfGroupbyTags > 0) {
pInterpoInfo->pTags = calloc(1, numOfGroupbyTags * POINTER_BYTES + rowSize);
} }
// set the previous value to be null SFillInfo* pFillInfo = calloc(1, sizeof(SFillInfo));
tfree(pInterpoInfo->prevValues);
taosResetFillInfo(pFillInfo, skey);
pFillInfo->order = order;
pFillInfo->fillType = fillType;
pFillInfo->pFillCol = pFillCol;
pFillInfo->numOfTags = numOfTags;
pFillInfo->numOfCols = numOfCols;
pFillInfo->slidingTime = slidingTime;
pFillInfo->pData = malloc(POINTER_BYTES * numOfCols);
int32_t rowsize = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t bytes = pFillInfo->pFillCol[i].col.bytes;
pFillInfo->pData[i] = calloc(1, sizeof(tFilePage) + bytes * capacity);
rowsize += bytes;
}
if (numOfTags > 0) {
pFillInfo->pTags = calloc(1, pFillInfo->numOfTags * POINTER_BYTES + rowsize);
}
pFillInfo->rowSize = rowsize;
return pFillInfo;
} }
// the SInterpolationInfo itself will not be released void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
void taosDestoryInterpoInfo(SInterpolationInfo* pInterpoInfo) { pFillInfo->start = startTimestamp;
if (pInterpoInfo == NULL) { pFillInfo->rowIdx = -1;
pFillInfo->numOfRows = 0;
pFillInfo->numOfCurrent = 0;
pFillInfo->numOfTotal = 0;
}
void taosDestoryFillInfo(SFillInfo* pFillInfo) {
if (pFillInfo == NULL) {
return; return;
} }
tfree(pInterpoInfo->prevValues); tfree(pFillInfo->prevValues);
tfree(pInterpoInfo->nextValues); tfree(pFillInfo->nextValues);
tfree(pFillInfo->pTags);
tfree(pInterpoInfo->pTags); tfree(pFillInfo);
} }
void taosInterpoSetStartInfo(SInterpolationInfo* pInterpoInfo, int32_t numOfRawDataInRows, int32_t type) { void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) {
if (type == TSDB_INTERPO_NONE) { if (pFillInfo->fillType == TSDB_FILL_NONE) {
return; return;
} }
pInterpoInfo->rowIdx = 0; pFillInfo->rowIdx = 0;
pInterpoInfo->numOfRawDataInRows = numOfRawDataInRows; pFillInfo->numOfRows = numOfRows;
pFillInfo->endKey = endKey;
} }
TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int32_t timeInterval, int8_t intervalTimeUnit, int8_t precision) { void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput) {
if (order == TSDB_ORDER_ASC) { // copy the data into source data buffer
return ekey; for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
} else { memcpy(pFillInfo->pData[i], pInput[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes);
return taosGetIntervalStartTimestamp(ekey, timeInterval, intervalTimeUnit, precision);
} }
} }
int32_t taosGetNumOfResultWithInterpo(SInterpolationInfo* pInterpoInfo, TSKEY* pPrimaryKeyArray, void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput) {
int32_t numOfRawDataInRows, int64_t nInterval, int64_t ekey, assert(pFillInfo->numOfRows == pInput->num);
int32_t maxNumOfRows) { for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
int32_t numOfRes = taosGetNumOfResWithoutLimit(pInterpoInfo, pPrimaryKeyArray, numOfRawDataInRows, nInterval, ekey); SFillColInfo* pCol = &pFillInfo->pFillCol[i];
return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;
} char* s = pInput->data + pCol->col.offset * pInput->num;
memcpy(pFillInfo->pData[i], s, pInput->num * pCol->col.bytes);
int32_t taosGetNumOfResWithoutLimit(SInterpolationInfo* pInterpoInfo, int64_t* pPrimaryKeyArray,
int32_t numOfAvailRawData, int64_t nInterval, int64_t ekey) { if (pCol->flag == TSDB_COL_TAG) { // copy the tag value
if (numOfAvailRawData > 0) { memcpy(pFillInfo->pTags[i], pFillInfo->pData[i], pCol->col.bytes);
int32_t finalNumOfResult = 0;
// get last timestamp, calculate the result size
int64_t lastKey = pPrimaryKeyArray[pInterpoInfo->numOfRawDataInRows - 1];
finalNumOfResult = (int32_t)(labs(lastKey - pInterpoInfo->startTimestamp) / nInterval) + 1;
assert(finalNumOfResult >= numOfAvailRawData);
return finalNumOfResult;
} else {
/* reach the end of data */
if ((ekey < pInterpoInfo->startTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) ||
(ekey > pInterpoInfo->startTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) {
return 0;
} else {
return (int32_t)(labs(ekey - pInterpoInfo->startTimestamp) / nInterval) + 1;
} }
} }
} }
bool taosHasRemainsDataForInterpolation(SInterpolationInfo* pInterpoInfo) { TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterval, int8_t slidingTimeUnit, int8_t precision) {
return taosNumOfRemainPoints(pInterpoInfo) > 0; if (order == TSDB_ORDER_ASC) {
return ekey;
} else {
return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision);
}
} }
int32_t taosNumOfRemainPoints(SInterpolationInfo* pInterpoInfo) { static int32_t taosGetTotalNumOfFilledRes(SFillInfo* pFillInfo, const TSKEY* tsArray, int32_t remain,
if (pInterpoInfo->rowIdx == -1 || pInterpoInfo->numOfRawDataInRows == 0) { int64_t nInterval, int64_t ekey) {
if (remain > 0) { // still fill gap within current data block, not generating data after the result set.
TSKEY lastKey = tsArray[pFillInfo->numOfRows - 1];
int32_t total = (int32_t)(labs(lastKey - pFillInfo->start) / nInterval) + 1;
assert(total >= remain);
return total;
} else { // reach the end of data
if ((ekey < pFillInfo->start && FILL_IS_ASC_FILL(pFillInfo)) ||
(ekey > pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo))) {
return 0;
} else {
return (int32_t)(labs(ekey - pFillInfo->start) / nInterval) + 1;
}
}
}
int32_t taosGetNumOfResultWithFill(SFillInfo* pFillInfo, int32_t numOfRows, int64_t ekey, int32_t maxNumOfRows) {
int32_t numOfRes = taosGetTotalNumOfFilledRes(pFillInfo, (int64_t*) pFillInfo->pData[0], numOfRows,
pFillInfo->slidingTime, ekey);
return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;
}
int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
if (pFillInfo->rowIdx == -1 || pFillInfo->numOfRows == 0) {
return 0; return 0;
} }
return INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? (pInterpoInfo->numOfRawDataInRows - pInterpoInfo->rowIdx) return FILL_IS_ASC_FILL(pFillInfo) ? (pFillInfo->numOfRows - pFillInfo->rowIdx)
: pInterpoInfo->rowIdx + 1; : pFillInfo->rowIdx + 1;
} }
static double doLinearInterpolationImpl(double v1, double v2, double k1, double k2, double k) { static double linearInterpolationImpl(double v1, double v2, double k1, double k2, double k) {
return v1 + (v2 - v1) * (k - k1) / (k2 - k1); return v1 + (v2 - v1) * (k - k1) / (k2 - k1);
} }
int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) { int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
*(int32_t*)point->val = doLinearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key, *(int32_t*)point->val = linearInterpolationImpl(*(int32_t*)point1->val, *(int32_t*)point2->val, point1->key,
point2->key, point->key); point2->key, point->key);
break; break;
} }
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
*(float*)point->val = *(float*)point->val =
doLinearInterpolationImpl(*(float*)point1->val, *(float*)point2->val, point1->key, point2->key, point->key); linearInterpolationImpl(*(float*)point1->val, *(float*)point2->val, point1->key, point2->key, point->key);
break; break;
}; };
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
*(double*)point->val = *(double*)point->val =
doLinearInterpolationImpl(*(double*)point1->val, *(double*)point2->val, point1->key, point2->key, point->key); linearInterpolationImpl(*(double*)point1->val, *(double*)point2->val, point1->key, point2->key, point->key);
break; break;
}; };
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT: {
*(int64_t*)point->val = doLinearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key, *(int64_t*)point->val = linearInterpolationImpl(*(int64_t*)point1->val, *(int64_t*)point2->val, point1->key,
point2->key, point->key); point2->key, point->key);
break; break;
}; };
case TSDB_DATA_TYPE_SMALLINT: { case TSDB_DATA_TYPE_SMALLINT: {
*(int16_t*)point->val = doLinearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key, *(int16_t*)point->val = linearInterpolationImpl(*(int16_t*)point1->val, *(int16_t*)point2->val, point1->key,
point2->key, point->key); point2->key, point->key);
break; break;
}; };
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
*(int8_t*)point->val = *(int8_t*)point->val =
doLinearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, point1->key, point2->key, point->key); linearInterpolationImpl(*(int8_t*)point1->val, *(int8_t*)point2->val, point1->key, point2->key, point->key);
break; break;
}; };
default: { default: {
@ -191,239 +231,247 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi
return 0; return 0;
} }
static char* getPos(char* data, int32_t bytes, int32_t index) { return data + index * bytes; } static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, char** pTags, int32_t start, int32_t num) {
for (int32_t j = 0, i = start; i < pColInfo->numOfCols + pColInfo->numOfTags; ++i, ++j) {
static void setTagsValueInInterpolation(tFilePage** data, char** pTags, SColumnModel* pModel, int32_t order, SFillColInfo* pCol = &pColInfo->pFillCol[i];
int32_t start, int32_t capacity, int32_t num) {
for (int32_t j = 0, i = start; i < pModel->numOfCols; ++i, ++j) { char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num);
SSchema* pSchema = getColumnModelSchema(pModel, i); assignVal(val1, pTags[j], pCol->col.bytes, pCol->col.type);
char* val1 = getPos(data[i]->data, pSchema->bytes, num);
assignVal(val1, pTags[j], pSchema->bytes, pSchema->type);
} }
} }
static void doInterpoResultImpl(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data, static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData,
SColumnModel* pModel, int32_t* num, char** srcData, int64_t nInterval, int64_t ts, char** pTags, bool outOfBound) {
int64_t* defaultVal, int64_t currentTimestamp, int32_t capacity, int32_t numOfTags, char** prevValues = &pFillInfo->prevValues;
char** pTags, bool outOfBound) { char** nextValues = &pFillInfo->nextValues;
char** prevValues = &pInterpoInfo->prevValues;
char** nextValues = &pInterpoInfo->nextValues;
SPoint point1, point2, point; SPoint point1, point2, point;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pInterpoInfo->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
char* val = getPos(data[0]->data, TSDB_KEYSIZE, *num); char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, *num);
*(TSKEY*)val = pInterpoInfo->startTimestamp; *(TSKEY*) val = pFillInfo->start;
int32_t numOfValCols = pModel->numOfCols - numOfTags; int32_t numOfValCols = pFillInfo->numOfCols - pFillInfo->numOfTags;
// set the other values // set the other values
if (interpoType == TSDB_INTERPO_PREV) { if (pFillInfo->fillType == TSDB_FILL_PREV) {
char* pInterpolationData = INTERPOL_IS_ASC_INTERPOL(pInterpoInfo) ? *prevValues : *nextValues; char* pInterpolationData = FILL_IS_ASC_FILL(pFillInfo) ? *prevValues : *nextValues;
if (pInterpolationData != NULL) { if (pInterpolationData != NULL) {
for (int32_t i = 1; i < numOfValCols; ++i) { for (int32_t i = 1; i < numOfValCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i); SFillColInfo* pCol = &pFillInfo->pFillCol[i];
int16_t offset = getColumnModelOffset(pModel, i);
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
char* val1 = getPos(data[i]->data, pSchema->bytes, *num); if (isNull(pInterpolationData + pCol->col.offset, pCol->col.type)) {
setNull(val1, pCol->col.type, pCol->col.bytes);
if (isNull(pInterpolationData + offset, pSchema->type)) {
setNull(val1, pSchema->type, pSchema->bytes);
} else { } else {
assignVal(val1, pInterpolationData + offset, pSchema->bytes, pSchema->type); assignVal(val1, pInterpolationData + pCol->col.offset, pCol->col.bytes, pCol->col.type);
} }
} }
} else { /* no prev value yet, set the value for null */ } else { // no prev value yet, set the value for NULL
for (int32_t i = 1; i < numOfValCols; ++i) { for (int32_t i = 1; i < numOfValCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i); SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* val1 = getPos(data[i]->data, pSchema->bytes, *num); char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
setNull(val1, pSchema->type, pSchema->bytes); setNull(val1, pCol->col.type, pCol->col.bytes);
} }
} }
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num); setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
} else if (interpoType == TSDB_INTERPO_LINEAR) { } else if (pFillInfo->fillType == TSDB_FILL_LINEAR) {
// TODO : linear interpolation supports NULL value // TODO : linear interpolation supports NULL value
if (*prevValues != NULL && !outOfBound) { if (*prevValues != NULL && !outOfBound) {
for (int32_t i = 1; i < numOfValCols; ++i) { for (int32_t i = 1; i < numOfValCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i); SFillColInfo* pCol = &pFillInfo->pFillCol[i];
int16_t offset = getColumnModelOffset(pModel, i);
int16_t type = pCol->col.type;
int16_t type = pSchema->type; int16_t bytes = pCol->col.bytes;
char* val1 = getPos(data[i]->data, pSchema->bytes, *num);
char *val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) {
setNull(val1, type, pSchema->bytes); setNull(val1, pCol->col.type, bytes);
continue; continue;
} }
point1 = (SPoint){.key = *(TSKEY*)(*prevValues), .val = *prevValues + offset}; point1 = (SPoint){.key = *(TSKEY*)(*prevValues), .val = *prevValues + pCol->col.offset};
point2 = (SPoint){.key = currentTimestamp, .val = srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes}; point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->rowIdx * bytes};
point = (SPoint){.key = pInterpoInfo->startTimestamp, .val = val1}; point = (SPoint){.key = pFillInfo->start, .val = val1};
taosDoLinearInterpolation(type, &point1, &point2, &point); taosDoLinearInterpolation(type, &point1, &point2, &point);
} }
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num); setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
} else { } else {
for (int32_t i = 1; i < numOfValCols; ++i) { for (int32_t i = 1; i < numOfValCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i); SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* val1 = getPos(data[i]->data, pSchema->bytes, *num); char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
setNull(val1, pSchema->type, pSchema->bytes); setNull(val1, pCol->col.type, pCol->col.bytes);
} }
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num); setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
} }
} else { /* default value interpolation */ } else { /* default value interpolation */
for (int32_t i = 1; i < numOfValCols; ++i) { for (int32_t i = 1; i < numOfValCols; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i); SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* val1 = getPos(data[i]->data, pSchema->bytes, *num); char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, *num);
assignVal(val1, (char*)&defaultVal[i], pSchema->bytes, pSchema->type); assignVal(val1, (char*)&pCol->defaultVal.i, pCol->col.bytes, pCol->col.type);
} }
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, numOfValCols, capacity, *num); setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
} }
pInterpoInfo->startTimestamp += (nInterval * step); pFillInfo->start += (pFillInfo->slidingTime * step);
pInterpoInfo->numOfCurrentInterpo++; pFillInfo->numOfCurrent++;
(*num) += 1; (*num) += 1;
} }
static void initBeforeAfterDataBuf(SColumnModel* pModel, char** nextValues) { static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** nextValues) {
if (*nextValues != NULL) { if (*nextValues != NULL) {
return; return;
} }
*nextValues = calloc(1, pModel->rowSize); *nextValues = calloc(1, pFillInfo->rowSize);
for (int i = 1; i < pModel->numOfCols; i++) { for (int i = 1; i < pFillInfo->numOfCols; i++) {
int16_t offset = getColumnModelOffset(pModel, i); SFillColInfo* pCol = &pFillInfo->pFillCol[i];
SSchema* pSchema = getColumnModelSchema(pModel, i); setNull(*nextValues + pCol->col.offset, pCol->col.type, pCol->col.bytes);
setNull(*nextValues + offset, pSchema->type, pSchema->bytes);
} }
} }
int32_t taosDoInterpoResult(SInterpolationInfo* pInterpoInfo, int16_t interpoType, tFilePage** data, int32_t taosDoInterpoResult(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfRows, int32_t outputRows, char** srcData) {
int32_t numOfRawDataInRows, int32_t outputRows, int64_t nInterval,
const int64_t* pPrimaryKeyArray, SColumnModel* pModel, char** srcData, int64_t* defaultVal,
const int32_t* functionIDs, int32_t bufSize) {
int32_t num = 0; int32_t num = 0;
pInterpoInfo->numOfCurrentInterpo = 0; pFillInfo->numOfCurrent = 0;
char** prevValues = &pInterpoInfo->prevValues; char** prevValues = &pFillInfo->prevValues;
char** nextValues = &pInterpoInfo->nextValues; char** nextValues = &pFillInfo->nextValues;
int32_t numOfTags = pInterpoInfo->numOfTags; int32_t numOfTags = pFillInfo->numOfTags;
char** pTags = pInterpoInfo->pTags; char** pTags = pFillInfo->pTags;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pInterpoInfo->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
if (numOfRawDataInRows == 0) { if (numOfRows == 0) {
/* /*
* we need to rebuild whole data * we need to rebuild whole result set
* NOTE:we need to keep the last saved data, to satisfy the interpolation * NOTE:we need to keep the last saved data, to generated the filled data
*/ */
while (num < outputRows) { while (num < outputRows) {
doInterpoResultImpl(pInterpoInfo, interpoType, data, pModel, &num, srcData, nInterval, defaultVal, doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true);
pInterpoInfo->startTimestamp, bufSize, numOfTags, pTags, true);
} }
pInterpoInfo->numOfTotalInterpo += pInterpoInfo->numOfCurrentInterpo;
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
return outputRows; return outputRows;
} else { } else {
while (1) { while (1) {
int64_t currentTimestamp = pPrimaryKeyArray[pInterpoInfo->rowIdx]; int64_t ts = ((int64_t*)pFillInfo->pData[0])[pFillInfo->rowIdx];
if ((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || if ((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) { (pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) {
/* set the next value for interpolation */ /* set the next value for interpolation */
initBeforeAfterDataBuf(pModel, nextValues); initBeforeAfterDataBuf(pFillInfo, nextValues);
int32_t offset = pInterpoInfo->rowIdx; int32_t offset = pFillInfo->rowIdx;
for (int32_t tlen = 0, i = 0; i < pModel->numOfCols - numOfTags; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols - numOfTags; ++i) {
SSchema* pSchema = getColumnModelSchema(pModel, i); SFillColInfo* pCol = &pFillInfo->pFillCol[i];
memcpy(*nextValues + pCol->col.offset, srcData[i] + offset * pCol->col.bytes, pCol->col.bytes);
memcpy(*nextValues + tlen, srcData[i] + offset * pSchema->bytes, pSchema->bytes);
tlen += pSchema->bytes;
} }
} }
if (((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || if (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) && (pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) {
num < outputRows) {
while (((pInterpoInfo->startTimestamp < currentTimestamp && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
(pInterpoInfo->startTimestamp > currentTimestamp && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) && (pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) {
num < outputRows) { doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, false);
doInterpoResultImpl(pInterpoInfo, interpoType, data, pModel, &num, srcData, nInterval, defaultVal,
currentTimestamp, bufSize, numOfTags, pTags, false);
} }
/* output buffer is full, abort */ /* output buffer is full, abort */
if ((num == outputRows && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || if ((num == outputRows && FILL_IS_ASC_FILL(pFillInfo)) ||
(num < 0 && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo))) { (num < 0 && !FILL_IS_ASC_FILL(pFillInfo))) {
pInterpoInfo->numOfTotalInterpo += pInterpoInfo->numOfCurrentInterpo; pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
return outputRows; return outputRows;
} }
} else { } else {
assert(pInterpoInfo->startTimestamp == currentTimestamp); assert(pFillInfo->start == ts);
initBeforeAfterDataBuf(pFillInfo, prevValues);
initBeforeAfterDataBuf(pModel, prevValues);
// assign rows to dst buffer // assign rows to dst buffer
int32_t i = 0; int32_t i = 0;
for (int32_t tlen = 0; i < pModel->numOfCols - numOfTags; ++i) { for (; i < pFillInfo->numOfCols - numOfTags; ++i) {
int16_t offset = getColumnModelOffset(pModel, i); SFillColInfo* pCol = &pFillInfo->pFillCol[i];
SSchema* pSchema = getColumnModelSchema(pModel, i);
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num);
char* val1 = getPos(data[i]->data, pSchema->bytes, num); char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->rowIdx);
char* src = srcData[i] + pInterpoInfo->rowIdx * pSchema->bytes;
if (i == 0 || if (i == 0 ||
(functionIDs[i] != TSDB_FUNC_COUNT && !isNull(src, pSchema->type)) || (pCol->functionId != TSDB_FUNC_COUNT && !isNull(src, pCol->col.type)) ||
(functionIDs[i] == TSDB_FUNC_COUNT && *(int64_t*)(src) != 0)) { (pCol->functionId == TSDB_FUNC_COUNT && GET_INT64_VAL(src) != 0)) {
assignVal(val1, src, pSchema->bytes, pSchema->type); assignVal(val1, src, pCol->col.bytes, pCol->col.type);
memcpy(*prevValues + tlen, src, pSchema->bytes); memcpy(*prevValues + pCol->col.offset, src, pCol->col.bytes);
} else { // i > 0 and data is null , do interpolation } else { // i > 0 and data is null , do interpolation
if (interpoType == TSDB_INTERPO_PREV) { if (pFillInfo->fillType == TSDB_FILL_PREV) {
assignVal(val1, *prevValues + offset, pSchema->bytes, pSchema->type); assignVal(val1, *prevValues + pCol->col.offset, pCol->col.bytes, pCol->col.type);
} else if (interpoType == TSDB_INTERPO_LINEAR) { } else if (pFillInfo->fillType == TSDB_FILL_LINEAR) {
assignVal(val1, src, pSchema->bytes, pSchema->type); assignVal(val1, src, pCol->col.bytes, pCol->col.type);
memcpy(*prevValues + tlen, src, pSchema->bytes); memcpy(*prevValues + pCol->col.offset, src, pCol->col.bytes);
} else { } else {
assignVal(val1, (char*)&defaultVal[i], pSchema->bytes, pSchema->type); assignVal(val1, (char*) &pCol->defaultVal.i, pCol->col.bytes, pCol->col.type);
} }
} }
tlen += pSchema->bytes;
} }
/* set the tag value for final result */ // set the tag value for final result
setTagsValueInInterpolation(data, pTags, pModel, pInterpoInfo->order, pModel->numOfCols - numOfTags, bufSize, setTagsValue(pFillInfo, data, pTags, pFillInfo->numOfCols - numOfTags, num);
num);
pInterpoInfo->startTimestamp += (nInterval * step); pFillInfo->start += (pFillInfo->slidingTime * step);
pInterpoInfo->rowIdx += 1; pFillInfo->rowIdx += 1;
num += 1; num += 1;
} }
if ((pInterpoInfo->rowIdx >= pInterpoInfo->numOfRawDataInRows && INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || if ((pFillInfo->rowIdx >= pFillInfo->numOfRows && FILL_IS_ASC_FILL(pFillInfo)) ||
(pInterpoInfo->rowIdx < 0 && !INTERPOL_IS_ASC_INTERPOL(pInterpoInfo)) || num >= outputRows) { (pFillInfo->rowIdx < 0 && !FILL_IS_ASC_FILL(pFillInfo)) || num >= outputRows) {
if (pInterpoInfo->rowIdx >= pInterpoInfo->numOfRawDataInRows || pInterpoInfo->rowIdx < 0) { if (pFillInfo->rowIdx >= pFillInfo->numOfRows || pFillInfo->rowIdx < 0) {
pInterpoInfo->rowIdx = -1; pFillInfo->rowIdx = -1;
pInterpoInfo->numOfRawDataInRows = 0; pFillInfo->numOfRows = 0;
/* the raw data block is exhausted, next value does not exists */ /* the raw data block is exhausted, next value does not exists */
tfree(*nextValues); tfree(*nextValues);
} }
pInterpoInfo->numOfTotalInterpo += pInterpoInfo->numOfCurrentInterpo; pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
return num; return num;
} }
} }
} }
} }
void taosFillInfoSetSource(SFillInfo* pFillInfo, tFilePage **data, TSKEY endKey) {
pFillInfo->endKey = endKey;
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
memcpy(pFillInfo->pData[i], data[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes);
}
}
void taosGenerateDataBlock(SFillInfo* pFillInfo, tFilePage** output, int64_t* outputRows, int32_t capacity) {
int32_t remain = taosNumOfRemainRows(pFillInfo); // todo use iterator?
// TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime,
// pQuery->slidingTimeUnit, pQuery->precision);
// if (QUERY_IS_ASC_QUERY(pQuery)) {
// assert(ekey >= pQuery->window.ekey);
// } else {
// assert(ekey <= pQuery->window.ekey);
// }
int32_t rows = taosGetNumOfResultWithFill(pFillInfo, remain, pFillInfo->endKey, capacity);
int32_t numOfRes = taosDoInterpoResult(pFillInfo, output, remain, rows, pFillInfo->pData);
*outputRows = rows;
assert(numOfRes == rows);
}

View File

@ -64,26 +64,26 @@ static tFilePage *loadIntoBucketFromDisk(tMemBucket *pMemBucket, int32_t segIdx,
for (uint32_t j = 0; j < pFlushInfo->numOfPages; ++j) { for (uint32_t j = 0; j < pFlushInfo->numOfPages; ++j) {
ret = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); ret = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
UNUSED(ret); UNUSED(ret);
assert(pPage->numOfElems > 0); assert(pPage->num > 0);
tColModelAppend(pDesc->pColumnModel, buffer, pPage->data, 0, pPage->numOfElems, pPage->numOfElems); tColModelAppend(pDesc->pColumnModel, buffer, pPage->data, 0, pPage->num, pPage->num);
printf("id: %d count: %" PRIu64 "\n", j, buffer->numOfElems); printf("id: %d count: %" PRIu64 "\n", j, buffer->num);
} }
} }
tfree(pPage); tfree(pPage);
assert(buffer->numOfElems == pMemBuffer->fileMeta.numOfElemsInFile); assert(buffer->num == pMemBuffer->fileMeta.numOfElemsInFile);
} }
// load data in pMemBuffer to buffer // load data in pMemBuffer to buffer
tFilePagesItem *pListItem = pMemBuffer->pHead; tFilePagesItem *pListItem = pMemBuffer->pHead;
while (pListItem != NULL) { while (pListItem != NULL) {
tColModelAppend(pDesc->pColumnModel, buffer, pListItem->item.data, 0, pListItem->item.numOfElems, tColModelAppend(pDesc->pColumnModel, buffer, pListItem->item.data, 0, pListItem->item.num,
pListItem->item.numOfElems); pListItem->item.num);
pListItem = pListItem->pNext; pListItem = pListItem->pNext;
} }
tColDataQSort(pDesc, buffer->numOfElems, 0, buffer->numOfElems - 1, buffer->data, TSDB_ORDER_ASC); tColDataQSort(pDesc, buffer->num, 0, buffer->num - 1, buffer->data, TSDB_ORDER_ASC);
pDesc->pColumnModel->capacity = oldCapacity; // restore value pDesc->pColumnModel->capacity = oldCapacity; // restore value
return buffer; return buffer;
@ -881,7 +881,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) { for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) {
ret = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); ret = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
UNUSED(ret); UNUSED(ret);
tMemBucketPut(pMemBucket, pPage->data, pPage->numOfElems); tMemBucketPut(pMemBucket, pPage->data, pPage->num);
} }
fclose(pMemBuffer->file); fclose(pMemBuffer->file);

View File

@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <qinterpolation.h>
#include "os.h" #include "os.h"
#include "hash.h" #include "hash.h"
@ -529,10 +530,10 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
pageId = getLastPageId(&list); pageId = getLastPageId(&list);
pData = getResultBufferPageById(pResultBuf, pageId); pData = getResultBufferPageById(pResultBuf, pageId);
if (pData->numOfElems >= numOfRowsPerPage) { if (pData->num >= numOfRowsPerPage) {
pData = getNewDataBuf(pResultBuf, sid, &pageId); pData = getNewDataBuf(pResultBuf, sid, &pageId);
if (pData != NULL) { if (pData != NULL) {
assert(pData->numOfElems == 0); // number of elements must be 0 for new allocated buffer assert(pData->num == 0); // number of elements must be 0 for new allocated buffer
} }
} }
} }
@ -544,7 +545,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
// set the number of rows in current disk page // set the number of rows in current disk page
if (pWindowRes->pos.pageId == -1) { // not allocated yet, allocate new buffer if (pWindowRes->pos.pageId == -1) { // not allocated yet, allocate new buffer
pWindowRes->pos.pageId = pageId; pWindowRes->pos.pageId = pageId;
pWindowRes->pos.rowId = pData->numOfElems++; pWindowRes->pos.rowId = pData->num++;
} }
return 0; return 0;
@ -1202,6 +1203,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
while (1) { while (1) {
getNextTimeWindow(pQuery, &nextWin); getNextTimeWindow(pQuery, &nextWin);
assert(pWindowResInfo->startTime <= nextWin.skey);
if (pWindowResInfo->startTime > nextWin.skey || if (pWindowResInfo->startTime > nextWin.skey ||
(nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || (nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextWin.skey > pQuery->window.skey && !QUERY_IS_ASC_QUERY(pQuery))) { (nextWin.skey > pQuery->window.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
@ -1415,7 +1418,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx)); pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx));
if (pRuntimeEnv->resultInfo == NULL || pRuntimeEnv->pCtx == NULL) { if (pRuntimeEnv->resultInfo == NULL || pRuntimeEnv->pCtx == NULL) {
goto _error_clean; goto _clean;
} }
pRuntimeEnv->offset[0] = 0; pRuntimeEnv->offset[0] = 0;
@ -1427,7 +1430,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
int32_t index = pSqlFuncMsg->colInfo.colIndex; int32_t index = pSqlFuncMsg->colInfo.colIndex;
if (TSDB_COL_IS_TAG(pIndex->flag)) { if (TSDB_COL_IS_TAG(pIndex->flag)) {
if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) { if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) { // todo refactor
pCtx->inputBytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; pCtx->inputBytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pCtx->inputType = TSDB_DATA_TYPE_BINARY; pCtx->inputType = TSDB_DATA_TYPE_BINARY;
} else { } else {
@ -1489,7 +1492,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
setCtxTagColumnInfo(pQuery, pRuntimeEnv->pCtx); setCtxTagColumnInfo(pQuery, pRuntimeEnv->pCtx);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error_clean: _clean:
tfree(pRuntimeEnv->resultInfo); tfree(pRuntimeEnv->resultInfo);
tfree(pRuntimeEnv->pCtx); tfree(pRuntimeEnv->pCtx);
@ -1524,15 +1527,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree(pRuntimeEnv->pCtx); tfree(pRuntimeEnv->pCtx);
} }
taosDestoryInterpoInfo(&pRuntimeEnv->interpoInfo); taosDestoryFillInfo(pRuntimeEnv->pFillInfo);
if (pRuntimeEnv->pInterpoBuf != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
tfree(pRuntimeEnv->pInterpoBuf[i]);
}
tfree(pRuntimeEnv->pInterpoBuf);
}
destroyResultBuf(pRuntimeEnv->pResultBuf, pQInfo); destroyResultBuf(pRuntimeEnv->pResultBuf, pQInfo);
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
@ -1975,7 +1970,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
// set the direct previous(next) point for process // set the direct previous(next) point for process
count = 2; count = 2;
if (pQuery->interpoType == TSDB_INTERPO_SET_VALUE) { if (pQuery->fillType == TSDB_FILL_SET_VALUE) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
@ -2005,7 +2000,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
} }
pInterpDetail->ts = pQuery->window.skey; pInterpDetail->ts = pQuery->window.skey;
pInterpDetail->type = pQuery->interpoType; pInterpDetail->type = pQuery->fillType;
} }
} else { } else {
TSKEY prevKey = *(TSKEY *)pPointInterpSupport->pPrevPoint[0]; TSKEY prevKey = *(TSKEY *)pPointInterpSupport->pPrevPoint[0];
@ -2040,7 +2035,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
tVariantCreateFromBinary(&pRuntimeEnv->pCtx[i].param[3], (char *)&count, sizeof(count), TSDB_DATA_TYPE_INT); tVariantCreateFromBinary(&pRuntimeEnv->pCtx[i].param[3], (char *)&count, sizeof(count), TSDB_DATA_TYPE_INT);
pInterpDetail->ts = pQInfo->runtimeEnv.pQuery->window.skey; pInterpDetail->ts = pQInfo->runtimeEnv.pQuery->window.skey;
pInterpDetail->type = pQuery->interpoType; pInterpDetail->type = pQuery->fillType;
} }
} }
} }
@ -2094,23 +2089,6 @@ void pointInterpSupporterDestroy(SPointInterpoSupporter *pPointInterpSupport) {
#endif #endif
} }
static UNUSED_FUNC void allocMemForInterpo(SQInfo *pQInfo, SQuery *pQuery, void *pMeterObj) {
#if 0
if (pQuery->interpoType != TSDB_INTERPO_NONE) {
assert(isIntervalQuery(pQuery) || (pQuery->intervalTime == 0 && isPointInterpoQuery(pQuery)));
if (isIntervalQuery(pQuery)) {
pQInfo->runtimeEnv.pInterpoBuf = malloc(POINTER_BYTES * pQuery->numOfOutput);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
pQInfo->runtimeEnv.pInterpoBuf[i] =
calloc(1, sizeof(tFilePage) + pQuery->pSelectExpr[i].bytes * pMeterObj->pointsPerFileBlock);
}
}
}
#endif
}
static int32_t getInitialPageNum(SQInfo *pQInfo) { static int32_t getInitialPageNum(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t INITIAL_RESULT_ROWS_VALUE = 16; int32_t INITIAL_RESULT_ROWS_VALUE = 16;
@ -2414,6 +2392,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
pWindowResInfo->startTime = pQuery->window.skey; pWindowResInfo->startTime = pQuery->window.skey;
pWindowResInfo->prevSKey = w.skey; pWindowResInfo->prevSKey = w.skey;
} }
if (pRuntimeEnv->pFillInfo != NULL) {
pRuntimeEnv->pFillInfo->start = w.skey;
}
} }
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
@ -2427,10 +2409,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pQuery->pSelectExpr[i].bytes; int32_t bytes = pQuery->pSelectExpr[i].bytes;
char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(SData)); char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage));
if (tmp == NULL) { // todo handle the oom if (tmp == NULL) { // todo handle the oom
} else { } else {
pQuery->sdata[i] = (SData *)tmp; pQuery->sdata[i] = (tFilePage *)tmp;
} }
// set the pCtx output buffer position // set the pCtx output buffer position
@ -2648,7 +2630,7 @@ static UNUSED_FUNC void printBinaryData(int32_t functionId, char *data, int32_t
} }
} }
void UNUSED_FUNC displayInterResult(SData **pdata, SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfRows) { void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfRows) {
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t numOfCols = pQuery->numOfOutput; int32_t numOfCols = pQuery->numOfOutput;
printf("super table query intermediate result, total:%d\n", numOfRows); printf("super table query intermediate result, total:%d\n", numOfRows);
@ -2787,7 +2769,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
int32_t total = 0; int32_t total = 0;
for (int32_t i = 0; i < list.size; ++i) { for (int32_t i = 0; i < list.size; ++i) {
tFilePage *pData = getResultBufferPageById(pResultBuf, list.pData[i]); tFilePage *pData = getResultBufferPageById(pResultBuf, list.pData[i]);
total += pData->numOfElems; total += pData->num;
} }
int32_t rows = total; int32_t rows = total;
@ -2800,11 +2782,11 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
char * pDest = pQuery->sdata[i]->data; char * pDest = pQuery->sdata[i]->data;
memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pData->numOfElems, memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pData->num,
bytes * pData->numOfElems); bytes * pData->num);
} }
offset += pData->numOfElems; offset += pData->num;
} }
assert(pQuery->rec.rows == 0); assert(pQuery->rec.rows == 0);
@ -2907,7 +2889,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
if (ts == lastTimestamp) { // merge with the last one if (ts == lastTimestamp) { // merge with the last one
doMerge(pRuntimeEnv, ts, pWindowRes, true); doMerge(pRuntimeEnv, ts, pWindowRes, true);
} else { // copy data to disk buffer } else { // copy data to disk buffer
if (buffer[0]->numOfElems == pQuery->rec.capacity) { if (buffer[0]->num == pQuery->rec.capacity) {
if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) { if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) {
return -1; return -1;
} }
@ -2916,7 +2898,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
} }
doMerge(pRuntimeEnv, ts, pWindowRes, false); doMerge(pRuntimeEnv, ts, pWindowRes, false);
buffer[0]->numOfElems += 1; buffer[0]->num += 1;
} }
lastTimestamp = ts; lastTimestamp = ts;
@ -2935,7 +2917,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
tLoserTreeAdjust(pTree, pos + pTree->numOfEntries); tLoserTreeAdjust(pTree, pos + pTree->numOfEntries);
} }
if (buffer[0]->numOfElems != 0) { // there are data in buffer if (buffer[0]->num != 0) { // there are data in buffer
if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) { if (flushFromResultBuf(pQInfo) != TSDB_CODE_SUCCESS) {
qError("QInfo:%p failed to flush data into temp file, abort query", pQInfo); qError("QInfo:%p failed to flush data into temp file, abort query", pQInfo);
@ -2993,10 +2975,10 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) {
// pagewise copy to dest buffer // pagewise copy to dest buffer
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
buf->numOfElems = r; buf->num = r;
memcpy(buf->data + pRuntimeEnv->offset[i] * buf->numOfElems, ((char *)pQuery->sdata[i]->data) + offset * bytes, memcpy(buf->data + pRuntimeEnv->offset[i] * buf->num, ((char *)pQuery->sdata[i]->data) + offset * bytes,
buf->numOfElems * bytes); buf->num * bytes);
} }
offset += r; offset += r;
@ -3534,7 +3516,7 @@ void setExecutionContext(SQInfo *pQInfo, STableId* pTableId, int32_t groupIdx, T
static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) { static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
// Note: pResult->pos[i]->numOfElems == 0, there is only fixed number of results for each group // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult); pCtx->aOutputBuf = getPosInResultPage(pRuntimeEnv, i, pResult);
@ -3788,80 +3770,43 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo
updateWindowResNumOfRes(pRuntimeEnv, pTableQueryInfo); updateWindowResNumOfRes(pRuntimeEnv, pTableQueryInfo);
} }
bool vnodeHasRemainResults(void *handle) { bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
SQInfo *pQInfo = (SQInfo *)handle; SQuery *pQuery = pRuntimeEnv->pQuery;
SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo;
if (pQInfo == NULL || pQInfo->runtimeEnv.pQuery->interpoType == TSDB_INTERPO_NONE) {
if (pQuery->fillType == TSDB_FILL_NONE) {
assert(pFillInfo == NULL);
return false; return false;
} }
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SInterpolationInfo *pInterpoInfo = &pRuntimeEnv->interpoInfo;
if (pQuery->limit.limit > 0 && pQuery->rec.rows >= pQuery->limit.limit) { if (pQuery->limit.limit > 0 && pQuery->rec.rows >= pQuery->limit.limit) {
return false; return false;
} }
int32_t remain = taosNumOfRemainPoints(pInterpoInfo); // There are results not returned to client, fill operation applied to the remain result set in the
// first place is required.
int32_t remain = taosNumOfRemainRows(pFillInfo);
if (remain > 0) { if (remain > 0) {
return true; return true;
} else {
if (pRuntimeEnv->pInterpoBuf == NULL) {
return false;
}
// query has completed
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
/*TSKEY ekey =*/taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->intervalTime,
pQuery->slidingTimeUnit, pQuery->precision);
// int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY
// *)pRuntimeEnv->pInterpoBuf[0]->data,
// remain, pQuery->intervalTime, ekey,
// pQuery->pointsToRead);
// return numOfTotal > 0;
assert(0);
return false;
}
return false;
}
}
static UNUSED_FUNC int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, tFilePage **pDataSrc, int32_t numOfRows,
int32_t outputRows) {
#if 0
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = &pRuntimeEnv->pQuery;
assert(pRuntimeEnv->pCtx[0].outputBytes == TSDB_KEYSIZE);
// build support structure for performing interpolation
SSchema *pSchema = calloc(1, sizeof(SSchema) * pQuery->numOfOutput);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
pSchema[i].bytes = pRuntimeEnv->pCtx[i].outputBytes;
pSchema[i].type = pQuery->pSelectExpr[i].type;
} }
// SColumnModel *pModel = createColumnModel(pSchema, pQuery->numOfOutput, pQuery->pointsToRead); /*
* There are no results returned to client now.
char * srcData[TSDB_MAX_COLUMNS] = {0}; * If query is not completed yet, the gaps between two results blocks need to be handled after next data block
int32_t functions[TSDB_MAX_COLUMNS] = {0}; * is retrieved from TSDB.
*
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { * NOTE: If the result set is not the first block, the gap in front of the result set will be filled. If the result
srcData[i] = pDataSrc[i]->data; * set is the FIRST result block, the gap between the start time of query time window and the timestamp of the
functions[i] = pQuery->pSelectExpr[i].base.functionId; * first result row in the actual result set will fill nothing.
*/
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime,
pQuery->slidingTimeUnit, pQuery->precision);
int32_t numOfTotal = taosGetNumOfResultWithFill(pFillInfo, remain, ekey, pQuery->rec.capacity);
return numOfTotal > 0;
} }
assert(0); return false;
// int32_t numOfRes = taosDoInterpoResult(&pRuntimeEnv->interpoInfo, pQuery->interpoType, data, numOfRows, outputRows,
// pQuery->intervalTime, (int64_t *)pDataSrc[0]->data, pModel, srcData,
// pQuery->defaultVal, functions, pRuntimeEnv->pTabObj->pointsPerFileBlock);
destroyColumnModel(pModel);
free(pSchema);
#endif
return 0;
} }
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) { static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data) {
@ -3887,37 +3832,30 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
// all data returned, set query over // all data returned, set query over
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
if (pQInfo->runtimeEnv.stableQuery && isIntervalQuery(pQuery)) { if (pQInfo->runtimeEnv.stableQuery) {
if (pQInfo->tableIndex >= pQInfo->groupInfo.numOfTables) { if (pQInfo->tableIndex >= pQInfo->groupInfo.numOfTables) {
setQueryStatus(pQuery, QUERY_OVER); setQueryStatus(pQuery, QUERY_OVER);
} }
} else { } else {
setQueryStatus(pQuery, QUERY_OVER); if (!queryHasRemainResults(&pQInfo->runtimeEnv)) {
setQueryStatus(pQuery, QUERY_OVER);
}
} }
} }
} }
int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage **pDataSrc, int32_t numOfRows, int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t numOfRows, int32_t *numOfInterpo) {
int32_t *numOfInterpo) { SQuery *pQuery = pRuntimeEnv->pQuery;
// SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
// SQuery * pQuery = pRuntimeEnv->pQuery;
#if 0
while (1) { while (1) {
numOfRows = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); taosGenerateDataBlock(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata, &pQuery->rec.rows, pQuery->rec.capacity);
int32_t ret = pQuery->rec.rows;
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.skey, pQuery->order.order, pQuery->intervalTime,
pQuery->slidingTimeUnit, pQuery->precision);
int32_t numOfFinalRows = taosGetNumOfResultWithInterpo(&pRuntimeEnv->interpoInfo, (TSKEY *)pDataSrc[0]->data,
numOfRows, pQuery->intervalTime, ekey, pQuery->pointsToRead);
int32_t ret = resultInterpolate(pQInfo, pDst, pDataSrc, numOfRows, numOfFinalRows);
assert(ret == numOfFinalRows);
// todo apply limit output function
/* reached the start position of according to offset value, return immediately */ /* reached the start position of according to offset value, return immediately */
if (pQuery->limit.offset == 0) { if (pQuery->limit.offset == 0) {
return ret; return ret;
} }
if (pQuery->limit.offset < ret) { if (pQuery->limit.offset < ret) {
ret -= pQuery->limit.offset; ret -= pQuery->limit.offset;
// todo !!!!there exactly number of interpo is not valid. // todo !!!!there exactly number of interpo is not valid.
@ -3932,13 +3870,12 @@ int32_t vnodeQueryResultInterpolate(SQInfo *pQInfo, tFilePage **pDst, tFilePage
pQuery->limit.offset -= ret; pQuery->limit.offset -= ret;
ret = 0; ret = 0;
} }
if (!vnodeHasRemainResults(pQInfo)) { if (!queryHasRemainResults(pRuntimeEnv)) {
return ret; return ret;
} }
} }
#endif
return 0; return 0;
} }
@ -4062,7 +3999,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
// if queried with value filter, do NOT forward query start position // if queried with value filter, do NOT forward query start position
if (pQuery->limit.offset <= 0 || pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) { if (pQuery->limit.offset <= 0 || pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->pFillInfo != NULL) {
return true; return true;
} }
@ -4199,6 +4136,27 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
} }
static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
int32_t numOfCols = pQuery->numOfOutput;
int32_t offset = 0;
SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo));
for(int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExprInfo = &pQuery->pSelectExpr[i];
pFillCol[i].col.bytes = pExprInfo->bytes;
pFillCol[i].col.type = pExprInfo->type;
pFillCol[i].col.offset = offset;
pFillCol[i].flag = TSDB_COL_NORMAL; // always be ta normal column for table query
pFillCol[i].functionId = pExprInfo->base.functionId;
pFillCol[i].defaultVal.i = pQuery->defaultVal[i];
offset += pExprInfo->bytes;
}
return pFillCol;
}
int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool isSTableQuery) { int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
@ -4290,16 +4248,12 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
// pointInterpSupporterSetData(pQInfo, &interpInfo); // pointInterpSupporterSetData(pQInfo, &interpInfo);
// pointInterpSupporterDestroy(&interpInfo); // pointInterpSupporterDestroy(&interpInfo);
// int64_t rs = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit, if (pQuery->fillType != TSDB_FILL_NONE) {
// pQuery->precision); SFillColInfo* pColInfo = taosCreateFillColInfo(pQuery);
// taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, rs, 0, 0); pRuntimeEnv->pFillInfo = taosInitFillInfo(pQuery->order.order, 0, 0, pQuery->rec.capacity, pQuery->numOfOutput,
// allocMemForInterpo(pQInfo, pQuery, pMeterObj); pQuery->slidingTime, pQuery->fillType, pColInfo);
}
// if (!isPointInterpoQuery(pQuery)) {
// assert(pQuery->pos >= 0 && pQuery->slot >= 0);
// }
// the pQuery->window.skey is changed during normalizedFirstQueryRange, so set the newest lastkey value
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -4952,7 +4906,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) {
// here we can ignore the records in case of no interpolation // here we can ignore the records in case of no interpolation
// todo handle offset, in case of top/bottom interval query // todo handle offset, in case of top/bottom interval query
if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 && if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 &&
pQuery->interpoType == TSDB_INTERPO_NONE) { pQuery->fillType == TSDB_FILL_NONE) {
// maxOutput <= 0, means current query does not generate any results // maxOutput <= 0, means current query does not generate any results
int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo); int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo);
@ -4977,7 +4931,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
// skip blocks without load the actual data block from file if no filter condition present // skip blocks without load the actual data block from file if no filter condition present
skipTimeInterval(pRuntimeEnv); skipTimeInterval(pRuntimeEnv);
if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) { if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0 && pRuntimeEnv->pFillInfo == NULL) {
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
return; return;
} }
@ -4994,22 +4948,18 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
} }
// the offset is handled at prepare stage if no interpolation involved // the offset is handled at prepare stage if no interpolation involved
if (pQuery->interpoType == TSDB_INTERPO_NONE) { if (pQuery->fillType == TSDB_FILL_NONE || pQuery->rec.rows == 0) {
limitResults(pQInfo); limitResults(pQInfo);
break; break;
} else { } else {
taosInterpoSetStartInfo(&pRuntimeEnv->interpoInfo, pQuery->rec.rows, pQuery->interpoType); TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime,
SData **pInterpoBuf = pRuntimeEnv->pInterpoBuf; pQuery->slidingTimeUnit, pQuery->precision);
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, ekey);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata);
memcpy(pInterpoBuf[i]->data, pQuery->sdata[i]->data, pQuery->rec.rows * pQuery->pSelectExpr[i].bytes);
}
numOfInterpo = 0; numOfInterpo = 0;
pQuery->rec.rows = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, pQuery->rec.rows, &numOfInterpo);
pQuery->rec.rows, &numOfInterpo);
qTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.rows); qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows);
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
limitResults(pQInfo); limitResults(pQInfo);
break; break;
@ -5035,19 +4985,20 @@ static void tableQueryImpl(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
if (vnodeHasRemainResults(pQInfo)) { if (queryHasRemainResults(pRuntimeEnv)) {
/* /*
* There are remain results that are not returned due to result interpolation * There are remain results that are not returned due to result interpolation
* So, we do keep in this procedure instead of launching retrieve procedure for next results. * So, we do keep in this procedure instead of launching retrieve procedure for next results.
*/ */
int32_t numOfInterpo = 0; int32_t numOfInterpo = 0;
int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); int32_t remain = taosNumOfRemainRows(pRuntimeEnv->pFillInfo);
pQuery->rec.rows = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, remain, &numOfInterpo);
(tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo);
qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows);
limitResults(pQInfo); if (pQuery->rec.rows > 0) {
limitResults(pQInfo);
pQInfo->pointsInterpo += numOfInterpo; }
qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
return; return;
} }
@ -5105,8 +5056,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
qTrace("QInfo:%p query is killed", pQInfo); qTrace("QInfo:%p query is killed", pQInfo);
} else {// todo set the table uid and tid in log } else {// todo set the table uid and tid in log
// SArray* p = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
// SPair* pair = taosArrayGet(p, 0);
qTrace("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows", qTrace("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
} }
@ -5130,7 +5079,7 @@ static void stableQueryImpl(SQInfo *pQInfo) {
// record the total elapsed time // record the total elapsed time
pQInfo->elapsedTime += (taosGetTimestampUs() - st); pQInfo->elapsedTime += (taosGetTimestampUs() - st);
// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType); // taosFillSetStartInfo(&pQInfo->runtimeEnv.pFillInfo, pQuery->size, pQInfo->query.fillType);
if (pQuery->rec.rows == 0) { if (pQuery->rec.rows == 0) {
qTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, pQInfo->groupInfo.numOfTables, qTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, pQInfo->groupInfo.numOfTables,
@ -5377,8 +5326,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg->orderType = htons(pQueryMsg->orderType); pQueryMsg->orderType = htons(pQueryMsg->orderType);
} }
pQueryMsg->interpoType = htons(pQueryMsg->interpoType); pQueryMsg->fillType = htons(pQueryMsg->fillType);
if (pQueryMsg->interpoType != TSDB_INTERPO_NONE) { if (pQueryMsg->fillType != TSDB_FILL_NONE) {
pQueryMsg->defaultVal = (uint64_t)(pMsg); pQueryMsg->defaultVal = (uint64_t)(pMsg);
int64_t *v = (int64_t *)pMsg; int64_t *v = (int64_t *)pMsg;
@ -5422,7 +5371,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
"outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64, "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime,
pQueryMsg->interpoType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset); pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset);
return 0; return 0;
} }
@ -5699,7 +5648,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQuery->intervalTime = pQueryMsg->intervalTime; pQuery->intervalTime = pQueryMsg->intervalTime;
pQuery->slidingTime = pQueryMsg->slidingTime; pQuery->slidingTime = pQueryMsg->slidingTime;
pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit; pQuery->slidingTimeUnit = pQueryMsg->slidingTimeUnit;
pQuery->interpoType = pQueryMsg->interpoType; pQuery->fillType = pQueryMsg->fillType;
pQuery->numOfTags = pQueryMsg->numOfTags; pQuery->numOfTags = pQueryMsg->numOfTags;
// todo do not allocate ?? // todo do not allocate ??
@ -5729,7 +5678,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
} }
// prepare the result buffer // prepare the result buffer
pQuery->sdata = (SData **)calloc(pQuery->numOfOutput, POINTER_BYTES); pQuery->sdata = (tFilePage **)calloc(pQuery->numOfOutput, POINTER_BYTES);
if (pQuery->sdata == NULL) { if (pQuery->sdata == NULL) {
goto _cleanup; goto _cleanup;
} }
@ -5742,14 +5691,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
assert(pExprs[col].interBytes >= pExprs[col].bytes); assert(pExprs[col].interBytes >= pExprs[col].bytes);
// allocate additional memory for interResults that are usually larger then final results // allocate additional memory for interResults that are usually larger then final results
size_t size = (pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interBytes + sizeof(SData); size_t size = (pQuery->rec.capacity + 1) * pExprs[col].bytes + pExprs[col].interBytes + sizeof(tFilePage);
pQuery->sdata[col] = (SData *)calloc(1, size); pQuery->sdata[col] = (tFilePage *)calloc(1, size);
if (pQuery->sdata[col] == NULL) { if (pQuery->sdata[col] == NULL) {
goto _cleanup; goto _cleanup;
} }
} }
if (pQuery->interpoType != TSDB_INTERPO_NONE) { if (pQuery->fillType != TSDB_FILL_NONE) {
pQuery->defaultVal = malloc(sizeof(int64_t) * pQuery->numOfOutput); pQuery->defaultVal = malloc(sizeof(int64_t) * pQuery->numOfOutput);
if (pQuery->defaultVal == NULL) { if (pQuery->defaultVal == NULL) {
goto _cleanup; goto _cleanup;

View File

@ -26,6 +26,8 @@ extern "C" {
#define TD_GE (TD_EQ | TD_GT) #define TD_GE (TD_EQ | TD_GT)
#define TD_LE (TD_EQ | TD_LT) #define TD_LE (TD_EQ | TD_LT)
#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx))
typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, const void *param); typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, const void *param);
/** /**

View File

@ -23,8 +23,6 @@
memcpy((__right), (__buf), (__size));\ memcpy((__right), (__buf), (__size));\
} while (0); } while (0);
#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx))
static void median(void *src, size_t size, size_t s, size_t e, const void *param, __ext_compar_fn_t comparFn, void* buf) { static void median(void *src, size_t size, size_t s, size_t e, const void *param, __ext_compar_fn_t comparFn, void* buf) {
int32_t mid = ((e - s) >> 1u) + s; int32_t mid = ((e - s) >> 1u) + s;