Merge branch 'feat/TS-4243-3.0' of https://github.com/taosdata/TDengine into feat/ly-TS-4243-3.0
This commit is contained in:
commit
9ed74085d8
|
@ -535,8 +535,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
|
||||||
|
|
||||||
if (asc) {
|
if (asc) {
|
||||||
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
|
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
|
||||||
pDataBlock->info.pks[0].val = *(int32_t*) skey;
|
GET_TYPED_DATA(pDataBlock->info.pks[0].val, int64_t, pColInfoData->info.type, skey);
|
||||||
pDataBlock->info.pks[1].val = *(int32_t*) ekey;
|
GET_TYPED_DATA(pDataBlock->info.pks[1].val, int64_t, pColInfoData->info.type, ekey);
|
||||||
} else { // todo refactor
|
} else { // todo refactor
|
||||||
memcpy(pDataBlock->info.pks[0].pData, varDataVal(skey), varDataLen(skey));
|
memcpy(pDataBlock->info.pks[0].pData, varDataVal(skey), varDataLen(skey));
|
||||||
pDataBlock->info.pks[0].nData = varDataLen(skey);
|
pDataBlock->info.pks[0].nData = varDataLen(skey);
|
||||||
|
@ -546,8 +546,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
|
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
|
||||||
pDataBlock->info.pks[0].val = *(int32_t*) ekey;
|
GET_TYPED_DATA(pDataBlock->info.pks[0].val, int64_t, pColInfoData->info.type, ekey);
|
||||||
pDataBlock->info.pks[1].val = *(int32_t*) skey;
|
GET_TYPED_DATA(pDataBlock->info.pks[1].val, int64_t, pColInfoData->info.type, skey);
|
||||||
} else { // todo refactor
|
} else { // todo refactor
|
||||||
memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey));
|
memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey));
|
||||||
pDataBlock->info.pks[0].nData = varDataLen(ekey);
|
pDataBlock->info.pks[0].nData = varDataLen(ekey);
|
||||||
|
@ -1491,6 +1491,18 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
||||||
blockDataAppendColInfo(pBlock, &colInfo);
|
blockDataAppendColInfo(pBlock, &colInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// prepare the pk buffer if necessary
|
||||||
|
if (IS_VAR_DATA_TYPE(pDataBlock->info.pks[0].type)) {
|
||||||
|
SValue* pVal = &pBlock->info.pks[0];
|
||||||
|
|
||||||
|
pVal->type = pDataBlock->info.pks[0].type;
|
||||||
|
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData);
|
||||||
|
|
||||||
|
pVal = &pBlock->info.pks[1];
|
||||||
|
pVal->type = pDataBlock->info.pks[1].type;
|
||||||
|
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData);
|
||||||
|
}
|
||||||
|
|
||||||
if (copyData) {
|
if (copyData) {
|
||||||
int32_t code = blockDataEnsureCapacity(pBlock, pDataBlock->info.rows);
|
int32_t code = blockDataEnsureCapacity(pBlock, pDataBlock->info.rows);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -593,12 +593,16 @@ static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart,
|
||||||
|
|
||||||
for (int32_t iCol = 0; iCol < pTSchema->numOfCols; iCol++) {
|
for (int32_t iCol = 0; iCol < pTSchema->numOfCols; iCol++) {
|
||||||
SColVal *pColVal = NULL;
|
SColVal *pColVal = NULL;
|
||||||
for (int32_t iRow = 0; iRow < nRow; iRow++) {
|
for (int32_t iRow = nRow - 1; iRow >= 0; --iRow) {
|
||||||
SColVal *pColValT = tRowIterNext(aIter[iRow]);
|
SColVal *pColValT = tRowIterNext(aIter[iRow]);
|
||||||
|
while (pColValT->cid < pTSchema->columns[iCol].colId) {
|
||||||
|
pColValT = tRowIterNext(aIter[iRow]);
|
||||||
|
}
|
||||||
|
|
||||||
// todo: take strategy according to the flag
|
// todo: take strategy according to the flag
|
||||||
if (COL_VAL_IS_VALUE(pColValT)) {
|
if (COL_VAL_IS_VALUE(pColValT)) {
|
||||||
pColVal = pColValT;
|
pColVal = pColValT;
|
||||||
|
break;
|
||||||
} else if (COL_VAL_IS_NULL(pColValT)) {
|
} else if (COL_VAL_IS_NULL(pColValT)) {
|
||||||
if (pColVal == NULL) {
|
if (pColVal == NULL) {
|
||||||
pColVal = pColValT;
|
pColVal = pColValT;
|
||||||
|
@ -2880,8 +2884,12 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
|
||||||
char *data) {
|
char *data) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (data == NULL) {
|
if (data == NULL) {
|
||||||
for (int32_t i = 0; i < nRows; ++i) {
|
if (pColData->cflag & COL_IS_KEY) {
|
||||||
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
|
code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
|
||||||
|
} else {
|
||||||
|
for (int32_t i = 0; i < nRows; ++i) {
|
||||||
|
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -2890,8 +2898,13 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
|
||||||
for (int32_t i = 0; i < nRows; ++i) {
|
for (int32_t i = 0; i < nRows; ++i) {
|
||||||
int32_t offset = *((int32_t *)lengthOrbitmap + i);
|
int32_t offset = *((int32_t *)lengthOrbitmap + i);
|
||||||
if (offset == -1) {
|
if (offset == -1) {
|
||||||
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
|
if (pColData->cflag & COL_IS_KEY) {
|
||||||
if (code) goto _exit;
|
code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
if ((code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0))) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (varDataTLen(data + offset) > bytes) {
|
if (varDataTLen(data + offset) > bytes) {
|
||||||
uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset),
|
uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset),
|
||||||
|
@ -2913,6 +2926,10 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
|
||||||
allValue = false;
|
allValue = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if ((pColData->cflag & COL_IS_KEY) && !allValue) {
|
||||||
|
code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
if (allValue) {
|
if (allValue) {
|
||||||
// optimize (todo)
|
// optimize (todo)
|
||||||
|
@ -2951,6 +2968,10 @@ int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind, int32
|
||||||
if (IS_VAR_DATA_TYPE(pColData->type)) { // var-length data type
|
if (IS_VAR_DATA_TYPE(pColData->type)) { // var-length data type
|
||||||
for (int32_t i = 0; i < pBind->num; ++i) {
|
for (int32_t i = 0; i < pBind->num; ++i) {
|
||||||
if (pBind->is_null && pBind->is_null[i]) {
|
if (pBind->is_null && pBind->is_null[i]) {
|
||||||
|
if (pColData->cflag & COL_IS_KEY) {
|
||||||
|
code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
|
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
} else if (pBind->length[i] > buffMaxLen) {
|
} else if (pBind->length[i] > buffMaxLen) {
|
||||||
|
@ -2973,6 +2994,11 @@ int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind, int32
|
||||||
allValue = true;
|
allValue = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((pColData->cflag & COL_IS_KEY) && !allValue) {
|
||||||
|
code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
if (allValue) {
|
if (allValue) {
|
||||||
// optimize (todo)
|
// optimize (todo)
|
||||||
for (int32_t i = 0; i < pBind->num; ++i) {
|
for (int32_t i = 0; i < pBind->num; ++i) {
|
||||||
|
@ -3002,91 +3028,6 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
|
||||||
static int32_t tColDataSwapValue(SColData *pColData, int32_t i, int32_t j) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
|
||||||
int32_t nData1 = pColData->aOffset[i + 1] - pColData->aOffset[i];
|
|
||||||
int32_t nData2 = (j < pColData->nVal - 1) ? pColData->aOffset[j + 1] - pColData->aOffset[j]
|
|
||||||
: pColData->nData - pColData->aOffset[j];
|
|
||||||
uint8_t *pData = taosMemoryMalloc(TMAX(nData1, nData2));
|
|
||||||
if (pData == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nData1 > nData2) {
|
|
||||||
memcpy(pData, pColData->pData + pColData->aOffset[i], nData1);
|
|
||||||
memcpy(pColData->pData + pColData->aOffset[i], pColData->pData + pColData->aOffset[j], nData2);
|
|
||||||
// memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i] + nData1,
|
|
||||||
// pColData->aOffset[j] - pColData->aOffset[i + 1]);
|
|
||||||
memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i + 1],
|
|
||||||
pColData->aOffset[j] - pColData->aOffset[i + 1]);
|
|
||||||
memcpy(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pData, nData1);
|
|
||||||
} else {
|
|
||||||
memcpy(pData, pColData->pData + pColData->aOffset[j], nData2);
|
|
||||||
memcpy(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pColData->pData + pColData->aOffset[i], nData1);
|
|
||||||
// memmove(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pColData->pData + pColData->aOffset[i] +
|
|
||||||
// nData1,
|
|
||||||
// pColData->aOffset[j] - pColData->aOffset[i + 1]);
|
|
||||||
memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i + 1],
|
|
||||||
pColData->aOffset[j] - pColData->aOffset[i + 1]);
|
|
||||||
memcpy(pColData->pData + pColData->aOffset[i], pData, nData2);
|
|
||||||
}
|
|
||||||
for (int32_t k = i + 1; k <= j; ++k) {
|
|
||||||
pColData->aOffset[k] = pColData->aOffset[k] + nData2 - nData1;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFree(pData);
|
|
||||||
} else {
|
|
||||||
uint64_t val;
|
|
||||||
memcpy(&val, &pColData->pData[TYPE_BYTES[pColData->type] * i], TYPE_BYTES[pColData->type]);
|
|
||||||
memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * i], &pColData->pData[TYPE_BYTES[pColData->type] * j],
|
|
||||||
TYPE_BYTES[pColData->type]);
|
|
||||||
memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * j], &val, TYPE_BYTES[pColData->type]);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tColDataSwap(SColData *pColData, int32_t i, int32_t j) {
|
|
||||||
ASSERT(i < j);
|
|
||||||
ASSERT(j < pColData->nVal);
|
|
||||||
|
|
||||||
switch (pColData->flag) {
|
|
||||||
case HAS_NONE:
|
|
||||||
case HAS_NULL:
|
|
||||||
break;
|
|
||||||
case (HAS_NULL | HAS_NONE): {
|
|
||||||
uint8_t bv = GET_BIT1(pColData->pBitMap, i);
|
|
||||||
SET_BIT1(pColData->pBitMap, i, GET_BIT1(pColData->pBitMap, j));
|
|
||||||
SET_BIT1(pColData->pBitMap, j, bv);
|
|
||||||
} break;
|
|
||||||
case HAS_VALUE: {
|
|
||||||
tColDataSwapValue(pColData, i, j);
|
|
||||||
} break;
|
|
||||||
case (HAS_VALUE | HAS_NONE):
|
|
||||||
case (HAS_VALUE | HAS_NULL): {
|
|
||||||
uint8_t bv = GET_BIT1(pColData->pBitMap, i);
|
|
||||||
SET_BIT1(pColData->pBitMap, i, GET_BIT1(pColData->pBitMap, j));
|
|
||||||
SET_BIT1(pColData->pBitMap, j, bv);
|
|
||||||
tColDataSwapValue(pColData, i, j);
|
|
||||||
} break;
|
|
||||||
case (HAS_VALUE | HAS_NULL | HAS_NONE): {
|
|
||||||
uint8_t bv = GET_BIT2(pColData->pBitMap, i);
|
|
||||||
SET_BIT2(pColData->pBitMap, i, GET_BIT2(pColData->pBitMap, j));
|
|
||||||
SET_BIT2(pColData->pBitMap, j, bv);
|
|
||||||
tColDataSwapValue(pColData, i, j);
|
|
||||||
} break;
|
|
||||||
default:
|
|
||||||
ASSERT(0);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
static int32_t tColDataCopyRowCell(SColData *pFromColData, int32_t iFromRow, SColData *pToColData, int32_t iToRow) {
|
static int32_t tColDataCopyRowCell(SColData *pFromColData, int32_t iFromRow, SColData *pToColData, int32_t iToRow) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -3170,11 +3111,27 @@ static int32_t tColDataCopyRowAppend(SColData *aFromColData, int32_t iFromRow, S
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void tColDataArrGetRowKey(SColData *aColData, int32_t nColData, int32_t iRow, SRowKey *key) {
|
||||||
|
SColVal cv;
|
||||||
|
|
||||||
|
key->ts = ((TSKEY *)aColData[0].pData)[iRow];
|
||||||
|
key->numOfPKs = 0;
|
||||||
|
|
||||||
|
for (int i = 1; i < nColData; i++) {
|
||||||
|
if (aColData[i].cflag & COL_IS_KEY) {
|
||||||
|
ASSERT(aColData->flag == HAS_VALUE);
|
||||||
|
tColDataGetValue4(&aColData[i], iRow, &cv);
|
||||||
|
key->pks[key->numOfPKs++] = cv.value;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tColDataMergeSortMerge(SColData *aColData, int32_t start, int32_t mid, int32_t end, int32_t nColData) {
|
static int32_t tColDataMergeSortMerge(SColData *aColData, int32_t start, int32_t mid, int32_t end, int32_t nColData) {
|
||||||
SColData *aDstColData = NULL;
|
SColData *aDstColData = NULL;
|
||||||
TSKEY *aKey = (TSKEY *)aColData[0].pData;
|
int32_t i = start, j = mid + 1, k = 0;
|
||||||
|
SRowKey keyi, keyj;
|
||||||
int32_t i = start, j = mid + 1, k = 0;
|
|
||||||
|
|
||||||
if (end > start) {
|
if (end > start) {
|
||||||
aDstColData = taosMemoryCalloc(1, sizeof(SColData) * nColData);
|
aDstColData = taosMemoryCalloc(1, sizeof(SColData) * nColData);
|
||||||
|
@ -3184,30 +3141,25 @@ static int32_t tColDataMergeSortMerge(SColData *aColData, int32_t start, int32_t
|
||||||
if (aDstColData == NULL) {
|
if (aDstColData == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
for (int32_t i = 0; i < nColData; i++) {
|
|
||||||
tColDataCopy(&aColData[i], &aDstColData[i], tColDataDefaultMalloc, NULL);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tColDataArrGetRowKey(aColData, nColData, i, &keyi);
|
||||||
|
tColDataArrGetRowKey(aColData, nColData, j, &keyj);
|
||||||
while (i <= mid && j <= end) {
|
while (i <= mid && j <= end) {
|
||||||
if (aKey[i] <= aKey[j]) {
|
if (tRowKeyCompare(&keyi, &keyj) <= 0) {
|
||||||
// tColDataCopyRow(aColData, i++, aDstColData, k++);
|
|
||||||
tColDataCopyRowAppend(aColData, i++, aDstColData, nColData);
|
tColDataCopyRowAppend(aColData, i++, aDstColData, nColData);
|
||||||
|
tColDataArrGetRowKey(aColData, nColData, i, &keyi);
|
||||||
} else {
|
} else {
|
||||||
// tColDataCopyRow(aColData, j++, aDstColData, k++);
|
|
||||||
tColDataCopyRowAppend(aColData, j++, aDstColData, nColData);
|
tColDataCopyRowAppend(aColData, j++, aDstColData, nColData);
|
||||||
|
tColDataArrGetRowKey(aColData, nColData, j, &keyj);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
while (i <= mid) {
|
while (i <= mid) {
|
||||||
// tColDataCopyRow(aColData, i++, aDstColData, k++);
|
|
||||||
tColDataCopyRowAppend(aColData, i++, aDstColData, nColData);
|
tColDataCopyRowAppend(aColData, i++, aDstColData, nColData);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (j <= end) {
|
while (j <= end) {
|
||||||
// tColDataCopyRow(aColData, j++, aDstColData, k++);
|
|
||||||
tColDataCopyRowAppend(aColData, j++, aDstColData, nColData);
|
tColDataCopyRowAppend(aColData, j++, aDstColData, nColData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3454,12 +3406,16 @@ static void tColDataMergeImpl(SColData *pColData, int32_t iStart, int32_t iEnd /
|
||||||
}
|
}
|
||||||
static void tColDataMerge(SColData *aColData, int32_t nColData) {
|
static void tColDataMerge(SColData *aColData, int32_t nColData) {
|
||||||
int32_t iStart = 0;
|
int32_t iStart = 0;
|
||||||
|
SRowKey keyStart, keyEnd;
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
if (iStart >= aColData[0].nVal - 1) break;
|
if (iStart >= aColData[0].nVal - 1) break;
|
||||||
|
tColDataArrGetRowKey(aColData, nColData, iStart, &keyStart);
|
||||||
|
|
||||||
int32_t iEnd = iStart + 1;
|
int32_t iEnd = iStart + 1;
|
||||||
while (iEnd < aColData[0].nVal) {
|
while (iEnd < aColData[0].nVal) {
|
||||||
if (((TSKEY *)aColData[0].pData)[iEnd] != ((TSKEY *)aColData[0].pData)[iStart]) break;
|
tColDataArrGetRowKey(aColData, nColData, iEnd, &keyEnd);
|
||||||
|
if (tRowKeyCompare(&keyStart, &keyEnd) != 0) break;
|
||||||
|
|
||||||
iEnd++;
|
iEnd++;
|
||||||
}
|
}
|
||||||
|
@ -3473,6 +3429,7 @@ static void tColDataMerge(SColData *aColData, int32_t nColData) {
|
||||||
iStart++;
|
iStart++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tColDataSortMerge(SArray *colDataArr) {
|
void tColDataSortMerge(SArray *colDataArr) {
|
||||||
int32_t nColData = TARRAY_SIZE(colDataArr);
|
int32_t nColData = TARRAY_SIZE(colDataArr);
|
||||||
SColData *aColData = (SColData *)TARRAY_DATA(colDataArr);
|
SColData *aColData = (SColData *)TARRAY_DATA(colDataArr);
|
||||||
|
@ -3486,11 +3443,17 @@ void tColDataSortMerge(SArray *colDataArr) {
|
||||||
int8_t doSort = 0;
|
int8_t doSort = 0;
|
||||||
int8_t doMerge = 0;
|
int8_t doMerge = 0;
|
||||||
// scan -------
|
// scan -------
|
||||||
TSKEY *aKey = (TSKEY *)aColData[0].pData;
|
SRowKey lastKey;
|
||||||
|
tColDataArrGetRowKey(aColData, nColData, 0, &lastKey);
|
||||||
for (int32_t iVal = 1; iVal < aColData[0].nVal; ++iVal) {
|
for (int32_t iVal = 1; iVal < aColData[0].nVal; ++iVal) {
|
||||||
if (aKey[iVal] > aKey[iVal - 1]) {
|
SRowKey key;
|
||||||
|
tColDataArrGetRowKey(aColData, nColData, iVal, &key);
|
||||||
|
|
||||||
|
int32_t c = tRowKeyCompare(&lastKey, &key);
|
||||||
|
if (c < 0) {
|
||||||
|
lastKey = key;
|
||||||
continue;
|
continue;
|
||||||
} else if (aKey[iVal] < aKey[iVal - 1]) {
|
} else if (c > 0) {
|
||||||
doSort = 1;
|
doSort = 1;
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -3504,11 +3467,17 @@ void tColDataSortMerge(SArray *colDataArr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (doMerge != 1) {
|
if (doMerge != 1) {
|
||||||
|
tColDataArrGetRowKey(aColData, nColData, 0, &lastKey);
|
||||||
for (int32_t iVal = 1; iVal < aColData[0].nVal; ++iVal) {
|
for (int32_t iVal = 1; iVal < aColData[0].nVal; ++iVal) {
|
||||||
if (aKey[iVal] == aKey[iVal - 1]) {
|
SRowKey key;
|
||||||
|
tColDataArrGetRowKey(aColData, nColData, iVal, &key);
|
||||||
|
|
||||||
|
int32_t c = tRowKeyCompare(&lastKey, &key);
|
||||||
|
if (c == 0) {
|
||||||
doMerge = 1;
|
doMerge = 1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
lastKey = key;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -664,10 +664,13 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
|
||||||
if (NULL != pLastCol) {
|
if (NULL != pLastCol) {
|
||||||
rocksdb_writebatch_delete(wb, keys_list[0], klen);
|
rocksdb_writebatch_delete(wb, keys_list[0], klen);
|
||||||
}
|
}
|
||||||
|
taosMemoryFreeClear(pLastCol);
|
||||||
|
|
||||||
pLastCol = tsdbCacheDeserialize(values_list[1]);
|
pLastCol = tsdbCacheDeserialize(values_list[1]);
|
||||||
if (NULL != pLastCol) {
|
if (NULL != pLastCol) {
|
||||||
rocksdb_writebatch_delete(wb, keys_list[1], klen);
|
rocksdb_writebatch_delete(wb, keys_list[1], klen);
|
||||||
}
|
}
|
||||||
|
taosMemoryFreeClear(pLastCol);
|
||||||
|
|
||||||
rocksdb_free(values_list[0]);
|
rocksdb_free(values_list[0]);
|
||||||
rocksdb_free(values_list[1]);
|
rocksdb_free(values_list[1]);
|
||||||
|
@ -675,9 +678,7 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
|
||||||
bool erase = false;
|
bool erase = false;
|
||||||
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[0], klen);
|
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[0], klen);
|
||||||
if (h) {
|
if (h) {
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
|
|
||||||
erase = true;
|
erase = true;
|
||||||
|
|
||||||
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
||||||
}
|
}
|
||||||
if (erase) {
|
if (erase) {
|
||||||
|
@ -687,16 +688,12 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
|
||||||
erase = false;
|
erase = false;
|
||||||
h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[1], klen);
|
h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[1], klen);
|
||||||
if (h) {
|
if (h) {
|
||||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
|
|
||||||
erase = true;
|
erase = true;
|
||||||
|
|
||||||
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
||||||
}
|
}
|
||||||
if (erase) {
|
if (erase) {
|
||||||
taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen);
|
taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pLastCol);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(keys_list[0]);
|
taosMemoryFree(keys_list[0]);
|
||||||
|
@ -1705,13 +1702,16 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
|
||||||
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
|
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
|
||||||
rocksdb_writebatch_delete(wb, keys_list[i], klen);
|
rocksdb_writebatch_delete(wb, keys_list[i], klen);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMemoryFreeClear(pLastCol);
|
||||||
|
|
||||||
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
|
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
|
||||||
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
|
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
|
||||||
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
|
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
||||||
|
taosMemoryFreeClear(pLastCol);
|
||||||
|
|
||||||
taosMemoryFree(pLastCol);
|
|
||||||
rocksdb_free(values_list[i]);
|
rocksdb_free(values_list[i]);
|
||||||
rocksdb_free(values_list[i + num_keys]);
|
rocksdb_free(values_list[i + num_keys]);
|
||||||
|
|
||||||
|
|
|
@ -1678,11 +1678,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
__compar_fn_t compFn = pReader->pkComparFn;
|
__compar_fn_t compFn = pReader->pkComparFn;
|
||||||
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
|
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
|
||||||
|
|
||||||
SRowKey* pSttKey = &(SRowKey){0};
|
SRowKey* pSttKey = NULL;
|
||||||
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
||||||
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
} else {
|
|
||||||
pSttKey = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SRowKey k;
|
SRowKey k;
|
||||||
|
@ -1714,10 +1712,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SRowKey minKey;
|
SRowKey minKey = k;
|
||||||
if (pReader->info.order == TSDB_ORDER_ASC) {
|
if (pReader->info.order == TSDB_ORDER_ASC) {
|
||||||
minKey = k; // chosen the minimum value
|
|
||||||
|
|
||||||
if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) < 0) {
|
if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) < 0) {
|
||||||
minKey = *pfKey;
|
minKey = *pfKey;
|
||||||
}
|
}
|
||||||
|
@ -1726,8 +1722,6 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = *pSttKey;
|
minKey = *pSttKey;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
minKey = k;
|
|
||||||
|
|
||||||
if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) > 0) {
|
if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) > 0) {
|
||||||
minKey = *pfKey;
|
minKey = *pfKey;
|
||||||
}
|
}
|
||||||
|
@ -1882,11 +1876,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
|
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
|
||||||
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
||||||
|
|
||||||
SRowKey* pSttKey = &(SRowKey){0};
|
SRowKey* pSttKey = NULL;
|
||||||
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
|
||||||
tRowKeyAssign(pSttKey, getCurrentKeyInSttBlock(pSttBlockReader));
|
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
} else {
|
|
||||||
pSttKey = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SRowKey* pfKey = &(SRowKey){0};
|
SRowKey* pfKey = &(SRowKey){0};
|
||||||
|
@ -1925,10 +1917,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SRowKey minKey = {0};
|
SRowKey minKey = k;
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||||
minKey = k; // let's find the minimum
|
|
||||||
|
|
||||||
if (pkCompEx(compFn, &ik, &minKey) < 0) { // minKey > ik.key.ts) {
|
if (pkCompEx(compFn, &ik, &minKey) < 0) { // minKey > ik.key.ts) {
|
||||||
minKey = ik;
|
minKey = ik;
|
||||||
}
|
}
|
||||||
|
@ -1941,7 +1931,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = *pSttKey;
|
minKey = *pSttKey;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
minKey = k; // let find the maximum ts value
|
|
||||||
if (pkCompEx(compFn, &ik, &minKey) > 0) {
|
if (pkCompEx(compFn, &ik, &minKey) > 0) {
|
||||||
minKey = ik;
|
minKey = ik;
|
||||||
}
|
}
|
||||||
|
@ -1968,7 +1957,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pkCompEx(compFn, &minKey, &pBlockScanInfo->lastProcKey) == 0) {
|
if (pkCompEx(compFn, &minKey, pSttKey) == 0) {
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -162,6 +162,7 @@ bool hasRemainResults(SGroupResInfo* pGroupResInfo);
|
||||||
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
|
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
|
||||||
|
|
||||||
SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode);
|
SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode);
|
||||||
|
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo);
|
||||||
|
|
||||||
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
|
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
|
||||||
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId, SStorageAPI* pAPI);
|
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId, SStorageAPI* pAPI);
|
||||||
|
|
|
@ -189,8 +189,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t lastTs = TSKEY_MIN;
|
int64_t lastTs = TSKEY_MIN;
|
||||||
bool updateLastRow = false;
|
bool needSortMerge = false;
|
||||||
bool disorderTs = false;
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < rows; ++j) { // iterate by row
|
for (int32_t j = 0; j < rows; ++j) { // iterate by row
|
||||||
taosArrayClear(pVals);
|
taosArrayClear(pVals);
|
||||||
|
@ -258,11 +257,9 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
|
||||||
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
|
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
|
||||||
taosArrayPush(pVals, &cv);
|
taosArrayPush(pVals, &cv);
|
||||||
} else {
|
} else {
|
||||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
|
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
|
||||||
if (*(int64_t*)var == lastTs) {
|
if (*(int64_t*)var <= lastTs) {
|
||||||
updateLastRow = true;
|
needSortMerge = true;
|
||||||
} else if (*(int64_t*)var < lastTs) {
|
|
||||||
disorderTs = true;
|
|
||||||
} else {
|
} else {
|
||||||
lastTs = *(int64_t*)var;
|
lastTs = *(int64_t*)var;
|
||||||
}
|
}
|
||||||
|
@ -287,17 +284,10 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
|
||||||
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
|
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
if (updateLastRow) {
|
taosArrayPush(tbData.aRowP, &pRow);
|
||||||
updateLastRow = false;
|
|
||||||
SRow** lastRow = taosArrayPop(tbData.aRowP);
|
|
||||||
tRowDestroy(*lastRow);
|
|
||||||
taosArrayPush(tbData.aRowP, &pRow);
|
|
||||||
} else {
|
|
||||||
taosArrayPush(tbData.aRowP, &pRow);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (disorderTs) {
|
if (needSortMerge) {
|
||||||
if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) ||
|
if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) ||
|
||||||
(terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) {
|
(terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) {
|
||||||
goto _end;
|
goto _end;
|
||||||
|
|
|
@ -250,6 +250,34 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) {
|
||||||
|
SDataBlockInfo* pBlockInfo = &pDataBlock->info;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
|
||||||
|
SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
|
||||||
|
if (pItem->isPk) {
|
||||||
|
SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
|
||||||
|
pBlockInfo->pks[0].type = pInfoData->info.type;
|
||||||
|
pBlockInfo->pks[1].type = pInfoData->info.type;
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
|
||||||
|
pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
|
||||||
|
if (pBlockInfo->pks[0].pData == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
|
||||||
|
if (pBlockInfo->pks[1].pData == NULL) {
|
||||||
|
taosMemoryFreeClear(pBlockInfo->pks[0].pData);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
|
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
|
||||||
SMetaReader* mr = (SMetaReader*)pContext;
|
SMetaReader* mr = (SMetaReader*)pContext;
|
||||||
if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
|
if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
|
||||||
|
|
|
@ -1196,23 +1196,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
|
pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||||
|
prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo);
|
||||||
|
|
||||||
{ // todo :refactor:
|
|
||||||
SDataBlockInfo* pBlockInfo = &pInfo->pResBlock->info;
|
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) {
|
|
||||||
SColMatchItem* pItem = taosArrayGet(pInfo->base.matchInfo.pList, i);
|
|
||||||
if (pItem->isPk) {
|
|
||||||
SColumnInfoData* pInfoData = taosArrayGet(pInfo->pResBlock->pDataBlock, pItem->dstSlotId);
|
|
||||||
pBlockInfo->pks[0].type = pInfoData->info.type;
|
|
||||||
pBlockInfo->pks[1].type = pInfoData->info.type;
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
|
|
||||||
pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
|
|
||||||
pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -4607,6 +4592,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
pInfo->bSortRowId = false;
|
pInfo->bSortRowId = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo);
|
||||||
|
|
||||||
pInfo->pSortInfo = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
|
pInfo->pSortInfo = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
|
||||||
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
|
|
||||||
|
|
|
@ -138,7 +138,7 @@ typedef struct SElapsedInfo {
|
||||||
|
|
||||||
typedef struct STwaInfo {
|
typedef struct STwaInfo {
|
||||||
double dOutput;
|
double dOutput;
|
||||||
bool isNull;
|
int64_t numOfElems;
|
||||||
SPoint1 p;
|
SPoint1 p;
|
||||||
STimeWindow win;
|
STimeWindow win;
|
||||||
} STwaInfo;
|
} STwaInfo;
|
||||||
|
@ -600,10 +600,10 @@ bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
|
||||||
bool funcInputGetNextRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) {
|
bool funcInputGetNextRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) {
|
||||||
SFuncInputRowIter* pIter = &pCtx->rowIter;
|
SFuncInputRowIter* pIter = &pCtx->rowIter;
|
||||||
if (pCtx->hasPrimaryKey) {
|
if (pCtx->hasPrimaryKey) {
|
||||||
if (pCtx->order == TSDB_ORDER_DESC) {
|
if (pCtx->order == TSDB_ORDER_ASC) {
|
||||||
return funcInputGetNextRowDescPk(pIter, pRow);
|
|
||||||
} else {
|
|
||||||
return funcInputGetNextRowAscPk(pIter, pRow);
|
return funcInputGetNextRowAscPk(pIter, pRow);
|
||||||
|
} else {
|
||||||
|
return funcInputGetNextRowDescPk(pIter, pRow);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return funcInputGetNextRowNoPk(pIter, pRow);
|
return funcInputGetNextRowNoPk(pIter, pRow);
|
||||||
|
@ -5556,7 +5556,7 @@ bool twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
pInfo->isNull = false;
|
pInfo->numOfElems = 0;
|
||||||
pInfo->p.key = INT64_MIN;
|
pInfo->p.key = INT64_MIN;
|
||||||
pInfo->win = TSWINDOW_INITIALIZER;
|
pInfo->win = TSWINDOW_INITIALIZER;
|
||||||
return true;
|
return true;
|
||||||
|
@ -5581,13 +5581,11 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
SPoint1* last = &pInfo->p;
|
||||||
SPoint1* last = &pInfo->p;
|
|
||||||
int32_t numOfElems = 0;
|
|
||||||
|
|
||||||
if (IS_NULL_TYPE(pInputCol->info.type)) {
|
if (IS_NULL_TYPE(pInputCol->info.type)) {
|
||||||
pInfo->isNull = true;
|
pInfo->numOfElems = 0;
|
||||||
goto _twa_over;
|
goto _twa_over;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5605,7 +5603,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
pInfo->dOutput += twa_get_area(pCtx->start, *last);
|
pInfo->dOutput += twa_get_area(pCtx->start, *last);
|
||||||
pInfo->win.skey = pCtx->start.key;
|
pInfo->win.skey = pCtx->start.key;
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else if (pInfo->p.key == INT64_MIN) {
|
} else if (pInfo->p.key == INT64_MIN) {
|
||||||
|
@ -5619,7 +5617,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
GET_TYPED_DATA(last->val, double, pInputCol->info.type, row.pData);
|
GET_TYPED_DATA(last->val, double, pInputCol->info.type, row.pData);
|
||||||
|
|
||||||
pInfo->win.skey = last->key;
|
pInfo->win.skey = last->key;
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5633,7 +5631,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(int8_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(int8_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5651,7 +5649,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(int16_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(int16_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5668,7 +5666,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(int32_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(int32_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5685,7 +5683,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(int64_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(int64_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5702,7 +5700,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(float_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(float_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5719,7 +5717,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(double*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(double*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5736,7 +5734,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(uint8_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(uint8_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5753,7 +5751,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(uint16_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(uint16_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5770,7 +5768,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(uint32_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(uint32_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5787,7 +5785,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (row.isDataNull) {
|
if (row.isDataNull) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
numOfElems++;
|
pInfo->numOfElems++;
|
||||||
|
|
||||||
INIT_INTP_POINT(st, row.ts, *(uint64_t*)row.pData);
|
INIT_INTP_POINT(st, row.ts, *(uint64_t*)row.pData);
|
||||||
if (pInfo->p.key == st.key) {
|
if (pInfo->p.key == st.key) {
|
||||||
|
@ -5808,16 +5806,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (pCtx->end.key != INT64_MIN) {
|
if (pCtx->end.key != INT64_MIN) {
|
||||||
pInfo->dOutput += twa_get_area(pInfo->p, pCtx->end);
|
pInfo->dOutput += twa_get_area(pInfo->p, pCtx->end);
|
||||||
pInfo->p = pCtx->end;
|
pInfo->p = pCtx->end;
|
||||||
numOfElems += 1;
|
pInfo->numOfElems += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->win.ekey = pInfo->p.key;
|
pInfo->win.ekey = pInfo->p.key;
|
||||||
|
|
||||||
_twa_over:
|
_twa_over:
|
||||||
if (numOfElems == 0) {
|
|
||||||
pInfo->isNull = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
SET_VAL(pResInfo, 1, 1);
|
SET_VAL(pResInfo, 1, 1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -5838,7 +5832,7 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
STwaInfo* pInfo = (STwaInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
STwaInfo* pInfo = (STwaInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
if (pInfo->isNull == true) {
|
if (pInfo->numOfElems == 0) {
|
||||||
pResInfo->numOfRes = 0;
|
pResInfo->numOfRes = 0;
|
||||||
} else {
|
} else {
|
||||||
if (pInfo->win.ekey == pInfo->win.skey) {
|
if (pInfo->win.ekey == pInfo->win.skey) {
|
||||||
|
|
|
@ -1392,7 +1392,7 @@ int32_t initTableColSubmitData(STableDataCxt* pTableCxt) {
|
||||||
if (NULL == pCol) {
|
if (NULL == pCol) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
tColDataInit(pCol, pSchema->colId, pSchema->type, 0);
|
tColDataInit(pCol, pSchema->colId, pSchema->type, pSchema->flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -3390,8 +3390,6 @@ class TDTestCase:
|
||||||
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')")
|
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')")
|
||||||
|
|
||||||
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:14') every(1s) fill(null)")
|
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:14') every(1s) fill(null)")
|
||||||
tdSql.error(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:15') every(1s) fill(null)")
|
|
||||||
tdSql.error(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
|
|
||||||
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
|
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
|
||||||
|
|
||||||
tdLog.printNoPrefix("======step 14: test interp ignore null values")
|
tdLog.printNoPrefix("======step 14: test interp ignore null values")
|
||||||
|
|
|
@ -1923,6 +1923,9 @@ int sml_td29373_Test() {
|
||||||
ASSERT(code == TSDB_CODE_SML_NOT_SUPPORT_PK);
|
ASSERT(code == TSDB_CODE_SML_NOT_SUPPORT_PK);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
for (int i = 0; i < 1; i++) {
|
||||||
|
taosMemoryFree(sql3[i]);
|
||||||
|
}
|
||||||
|
|
||||||
// case 4
|
// case 4
|
||||||
const char *sql4[] = {
|
const char *sql4[] = {
|
||||||
|
|
Loading…
Reference in New Issue