[td-225] fix bug while multiple tags exists during fill results
This commit is contained in:
parent
07e2f96115
commit
7c4f8f6464
|
@ -380,20 +380,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
|
|||
4096, (int32_t)numOfCols, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit,
|
||||
tinfo.precision, pQueryInfo->fillType, pFillCol);
|
||||
}
|
||||
|
||||
int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
|
||||
|
||||
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 && pReducer->pFillInfo != NULL) {
|
||||
pReducer->pFillInfo->pTags[0] = (char *)pReducer->pFillInfo->pTags + POINTER_BYTES * pQueryInfo->groupbyExpr.numOfGroupCols;
|
||||
for (int32_t i = 1; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
|
||||
SSchema *pSchema = getColumnModelSchema(pReducer->resColModel, startIndex + i - 1);
|
||||
pReducer->pFillInfo->pTags[i] = pSchema->bytes + pReducer->pFillInfo->pTags[i - 1];
|
||||
}
|
||||
} else {
|
||||
if (pReducer->pFillInfo != NULL) {
|
||||
assert(pReducer->pFillInfo->pTags == NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage,
|
||||
|
|
|
@ -2141,43 +2141,6 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
|
|||
}
|
||||
}
|
||||
|
||||
//void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
|
||||
// SFieldSupInfo* pInfo = TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex);
|
||||
// assert(pInfo->pSqlExpr != NULL);
|
||||
//
|
||||
// int32_t type = pInfo->pSqlExpr->resType;
|
||||
// int32_t bytes = pInfo->pSqlExpr->resBytes;
|
||||
//
|
||||
// char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
|
||||
//
|
||||
// if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
|
||||
// int32_t realLen = varDataLen(pData);
|
||||
// assert(realLen <= bytes - VARSTR_HEADER_SIZE);
|
||||
//
|
||||
// if (isNull(pData, type)) {
|
||||
// pRes->tsrow[columnIndex] = NULL;
|
||||
// } else {
|
||||
// pRes->tsrow[columnIndex] = ((tstr*)pData)->data;
|
||||
// }
|
||||
//
|
||||
// if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
|
||||
// *(pData + realLen + VARSTR_HEADER_SIZE) = 0;
|
||||
// }
|
||||
//
|
||||
// pRes->length[columnIndex] = realLen;
|
||||
// } else {
|
||||
// assert(bytes == tDataTypeDesc[type].nSize);
|
||||
//
|
||||
// if (isNull(pData, type)) {
|
||||
// pRes->tsrow[columnIndex] = NULL;
|
||||
// } else {
|
||||
// pRes->tsrow[columnIndex] = pData;
|
||||
// }
|
||||
//
|
||||
// pRes->length[columnIndex] = bytes;
|
||||
// }
|
||||
//}
|
||||
|
||||
void* malloc_throw(size_t size) {
|
||||
void* p = malloc(size);
|
||||
if (p == NULL) {
|
||||
|
|
|
@ -30,6 +30,11 @@ typedef struct {
|
|||
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
|
||||
union {int64_t i; double d;} fillVal;
|
||||
} SFillColInfo;
|
||||
|
||||
typedef struct {
|
||||
SSchema col;
|
||||
char* tagVal;
|
||||
} SFillTagColInfo;
|
||||
|
||||
typedef struct SFillInfo {
|
||||
TSKEY start; // start timestamp
|
||||
|
@ -44,7 +49,8 @@ typedef struct SFillInfo {
|
|||
int32_t numOfTags; // number of tags
|
||||
int32_t numOfCols; // number of columns, including the tags columns
|
||||
int32_t rowSize; // size of each row
|
||||
char ** pTags; // tags value for current interpolation
|
||||
// char ** pTags; // tags value for current interpolation
|
||||
SFillTagColInfo* pTags; // tags value for filling gap
|
||||
int64_t slidingTime; // sliding value to determine the number of result for a given time window
|
||||
char * prevValues; // previous row of data, to generate the interpolation results
|
||||
char * nextValues; // next row of data
|
||||
|
|
|
@ -42,19 +42,38 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
|
|||
pFillInfo->slidingUnit = slidingUnit;
|
||||
|
||||
pFillInfo->pData = malloc(POINTER_BYTES * numOfCols);
|
||||
|
||||
int32_t rowsize = 0;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
int32_t bytes = pFillInfo->pFillCol[i].col.bytes;
|
||||
pFillInfo->pData[i] = calloc(1, bytes * capacity);
|
||||
|
||||
rowsize += bytes;
|
||||
}
|
||||
|
||||
if (numOfTags > 0) {
|
||||
pFillInfo->pTags = calloc(1, pFillInfo->numOfTags * POINTER_BYTES + rowsize);
|
||||
pFillInfo->pTags = calloc(pFillInfo->numOfTags, sizeof(SFillTagColInfo));
|
||||
for(int32_t i = 0; i < numOfTags; ++i) {
|
||||
pFillInfo->pTags[i].col.colId = -2;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int32_t rowsize = 0;
|
||||
int32_t k = 0;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SFillColInfo* pColInfo = &pFillInfo->pFillCol[i];
|
||||
pFillInfo->pData[i] = calloc(1, pColInfo->col.bytes * capacity);
|
||||
|
||||
if (pColInfo->flag == TSDB_COL_TAG) {
|
||||
bool exists = false;
|
||||
for(int32_t j = 0; j < k; ++j) {
|
||||
if (pFillInfo->pTags[j].col.colId == pColInfo->col.colId) {
|
||||
exists = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!exists) {
|
||||
pFillInfo->pTags[k].col.colId = pColInfo->col.colId;
|
||||
pFillInfo->pTags[k].tagVal = calloc(1, pColInfo->col.bytes);
|
||||
|
||||
k += 1;
|
||||
}
|
||||
}
|
||||
rowsize += pColInfo->col.bytes;
|
||||
}
|
||||
|
||||
pFillInfo->rowSize = rowsize;
|
||||
pFillInfo->capacityInRows = capacity;
|
||||
|
||||
|
@ -129,16 +148,21 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput)
|
|||
|
||||
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInput) {
|
||||
assert(pFillInfo->numOfRows == pInput->num);
|
||||
int32_t t = 0;
|
||||
|
||||
|
||||
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||
|
||||
char* s = pInput->data + pCol->col.offset * pInput->num;
|
||||
memcpy(pFillInfo->pData[i], s, pInput->num * pCol->col.bytes);
|
||||
|
||||
if (pCol->flag == TSDB_COL_TAG && t < pFillInfo->numOfTags) { // copy the tag value
|
||||
memcpy(pFillInfo->pTags[t++], pFillInfo->pData[i], pCol->col.bytes);
|
||||
|
||||
char* data = pInput->data + pCol->col.offset * pInput->num;
|
||||
memcpy(pFillInfo->pData[i], data, pInput->num * pCol->col.bytes);
|
||||
|
||||
if (pCol->flag == TSDB_COL_TAG) { // copy the tag value to tag value buffer
|
||||
for (int32_t j = 0; j < pFillInfo->numOfTags; ++j) {
|
||||
SFillTagColInfo* pTag = &pFillInfo->pTags[j];
|
||||
if (pTag->col.colId == pCol->col.colId) {
|
||||
memcpy(pTag->tagVal, data, pCol->col.bytes);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -224,22 +248,31 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, char** pTags, int32_t start, int32_t num) {
|
||||
for (int32_t j = 0, i = start; i < pColInfo->numOfCols; ++i, ++j) {
|
||||
SFillColInfo* pCol = &pColInfo->pFillCol[i];
|
||||
|
||||
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num);
|
||||
assignVal(val1, pTags[j], pCol->col.bytes, pCol->col.type);
|
||||
static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, SFillTagColInfo *pTags, int32_t start, int32_t num) {
|
||||
for(int32_t j = 0; j < pColInfo->numOfCols; ++j) {
|
||||
SFillColInfo* pCol = &pColInfo->pFillCol[j];
|
||||
if (pCol->flag == TSDB_COL_NORMAL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, num);
|
||||
|
||||
for(int32_t i = 0; i < pColInfo->numOfTags; ++i) {
|
||||
SFillTagColInfo* pTag = &pColInfo->pTags[i];
|
||||
if (pTag->col.colId == pCol->col.colId) {
|
||||
assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData,
|
||||
int64_t ts, char** pTags, bool outOfBound) {
|
||||
static void doFillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t* num, char** srcData,
|
||||
int64_t ts, SFillTagColInfo* pTags, bool outOfBound) {
|
||||
char* prevValues = pFillInfo->prevValues;
|
||||
char* nextValues = pFillInfo->nextValues;
|
||||
|
||||
SPoint point1, point2, point;
|
||||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
||||
|
||||
char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, *num);
|
||||
|
@ -364,17 +397,16 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
|
|||
char** nextValues = &pFillInfo->nextValues;
|
||||
|
||||
int32_t numOfTags = pFillInfo->numOfTags;
|
||||
char** pTags = pFillInfo->pTags;
|
||||
SFillTagColInfo* pTags = pFillInfo->pTags;
|
||||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
||||
|
||||
if (numOfRows == 0) {
|
||||
/*
|
||||
* These data are generated according to fill strategy, since the current timestamp is out of time window of
|
||||
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
|
||||
*/
|
||||
while (num < outputRows) {
|
||||
doInterpoResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true);
|
||||
doFillResultImpl(pFillInfo, data, &num, srcData, pFillInfo->start, pTags, true);
|
||||
}
|
||||
|
||||
pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
|
||||
|
@ -401,7 +433,7 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
|
|||
|
||||
while (((pFillInfo->start < ts && FILL_IS_ASC_FILL(pFillInfo)) ||
|
||||
(pFillInfo->start > ts && !FILL_IS_ASC_FILL(pFillInfo))) && num < outputRows) {
|
||||
doInterpoResultImpl(pFillInfo, data, &num, srcData, ts, pTags, false);
|
||||
doFillResultImpl(pFillInfo, data, &num, srcData, ts, pTags, false);
|
||||
}
|
||||
|
||||
/* output buffer is full, abort */
|
||||
|
|
Loading…
Reference in New Issue