Merge branch 'feat/TS-4243-3.0' of github.com:taosdata/TDengine into TEST/3.0/TS-4243

This commit is contained in:
Chris Zhai 2024-04-08 20:30:33 +08:00
commit cdd8f3fa9d
11 changed files with 167 additions and 199 deletions

View File

@ -535,8 +535,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
if (asc) {
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
pDataBlock->info.pks[0].val = *(int32_t*) skey;
pDataBlock->info.pks[1].val = *(int32_t*) ekey;
GET_TYPED_DATA(pDataBlock->info.pks[0].val, int64_t, pColInfoData->info.type, skey);
GET_TYPED_DATA(pDataBlock->info.pks[1].val, int64_t, pColInfoData->info.type, ekey);
} else { // todo refactor
memcpy(pDataBlock->info.pks[0].pData, varDataVal(skey), varDataLen(skey));
pDataBlock->info.pks[0].nData = varDataLen(skey);
@ -546,8 +546,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
}
} else {
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
pDataBlock->info.pks[0].val = *(int32_t*) ekey;
pDataBlock->info.pks[1].val = *(int32_t*) skey;
GET_TYPED_DATA(pDataBlock->info.pks[0].val, int64_t, pColInfoData->info.type, ekey);
GET_TYPED_DATA(pDataBlock->info.pks[1].val, int64_t, pColInfoData->info.type, skey);
} else { // todo refactor
memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey));
pDataBlock->info.pks[0].nData = varDataLen(ekey);
@ -1491,6 +1491,18 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
blockDataAppendColInfo(pBlock, &colInfo);
}
// prepare the pk buffer if necessary
if (IS_VAR_DATA_TYPE(pDataBlock->info.pks[0].type)) {
SValue* pVal = &pBlock->info.pks[0];
pVal->type = pDataBlock->info.pks[0].type;
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData);
pVal = &pBlock->info.pks[1];
pVal->type = pDataBlock->info.pks[1].type;
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData);
}
if (copyData) {
int32_t code = blockDataEnsureCapacity(pBlock, pDataBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -593,12 +593,16 @@ static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart,
for (int32_t iCol = 0; iCol < pTSchema->numOfCols; iCol++) {
SColVal *pColVal = NULL;
for (int32_t iRow = 0; iRow < nRow; iRow++) {
for (int32_t iRow = nRow - 1; iRow >= 0; --iRow) {
SColVal *pColValT = tRowIterNext(aIter[iRow]);
while (pColValT->cid < pTSchema->columns[iCol].colId) {
pColValT = tRowIterNext(aIter[iRow]);
}
// todo: take strategy according to the flag
if (COL_VAL_IS_VALUE(pColValT)) {
pColVal = pColValT;
break;
} else if (COL_VAL_IS_NULL(pColValT)) {
if (pColVal == NULL) {
pColVal = pColValT;
@ -2880,8 +2884,12 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
char *data) {
int32_t code = 0;
if (data == NULL) {
for (int32_t i = 0; i < nRows; ++i) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
if (pColData->cflag & COL_IS_KEY) {
code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
} else {
for (int32_t i = 0; i < nRows; ++i) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
}
}
goto _exit;
}
@ -2890,8 +2898,13 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
for (int32_t i = 0; i < nRows; ++i) {
int32_t offset = *((int32_t *)lengthOrbitmap + i);
if (offset == -1) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
if (code) goto _exit;
if (pColData->cflag & COL_IS_KEY) {
code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
goto _exit;
}
if ((code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0))) {
goto _exit;
}
} else {
if (varDataTLen(data + offset) > bytes) {
uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset),
@ -2913,6 +2926,10 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
allValue = false;
}
}
if ((pColData->cflag & COL_IS_KEY) && !allValue) {
code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
goto _exit;
}
if (allValue) {
// optimize (todo)
@ -2951,6 +2968,10 @@ int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind, int32
if (IS_VAR_DATA_TYPE(pColData->type)) { // var-length data type
for (int32_t i = 0; i < pBind->num; ++i) {
if (pBind->is_null && pBind->is_null[i]) {
if (pColData->cflag & COL_IS_KEY) {
code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
goto _exit;
}
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
if (code) goto _exit;
} else if (pBind->length[i] > buffMaxLen) {
@ -2973,6 +2994,11 @@ int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind, int32
allValue = true;
}
if ((pColData->cflag & COL_IS_KEY) && !allValue) {
code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
goto _exit;
}
if (allValue) {
// optimize (todo)
for (int32_t i = 0; i < pBind->num; ++i) {
@ -3002,91 +3028,6 @@ _exit:
return code;
}
#ifdef BUILD_NO_CALL
static int32_t tColDataSwapValue(SColData *pColData, int32_t i, int32_t j) {
int32_t code = 0;
if (IS_VAR_DATA_TYPE(pColData->type)) {
int32_t nData1 = pColData->aOffset[i + 1] - pColData->aOffset[i];
int32_t nData2 = (j < pColData->nVal - 1) ? pColData->aOffset[j + 1] - pColData->aOffset[j]
: pColData->nData - pColData->aOffset[j];
uint8_t *pData = taosMemoryMalloc(TMAX(nData1, nData2));
if (pData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
if (nData1 > nData2) {
memcpy(pData, pColData->pData + pColData->aOffset[i], nData1);
memcpy(pColData->pData + pColData->aOffset[i], pColData->pData + pColData->aOffset[j], nData2);
// memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i] + nData1,
// pColData->aOffset[j] - pColData->aOffset[i + 1]);
memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i + 1],
pColData->aOffset[j] - pColData->aOffset[i + 1]);
memcpy(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pData, nData1);
} else {
memcpy(pData, pColData->pData + pColData->aOffset[j], nData2);
memcpy(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pColData->pData + pColData->aOffset[i], nData1);
// memmove(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pColData->pData + pColData->aOffset[i] +
// nData1,
// pColData->aOffset[j] - pColData->aOffset[i + 1]);
memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i + 1],
pColData->aOffset[j] - pColData->aOffset[i + 1]);
memcpy(pColData->pData + pColData->aOffset[i], pData, nData2);
}
for (int32_t k = i + 1; k <= j; ++k) {
pColData->aOffset[k] = pColData->aOffset[k] + nData2 - nData1;
}
taosMemoryFree(pData);
} else {
uint64_t val;
memcpy(&val, &pColData->pData[TYPE_BYTES[pColData->type] * i], TYPE_BYTES[pColData->type]);
memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * i], &pColData->pData[TYPE_BYTES[pColData->type] * j],
TYPE_BYTES[pColData->type]);
memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * j], &val, TYPE_BYTES[pColData->type]);
}
_exit:
return code;
}
static void tColDataSwap(SColData *pColData, int32_t i, int32_t j) {
ASSERT(i < j);
ASSERT(j < pColData->nVal);
switch (pColData->flag) {
case HAS_NONE:
case HAS_NULL:
break;
case (HAS_NULL | HAS_NONE): {
uint8_t bv = GET_BIT1(pColData->pBitMap, i);
SET_BIT1(pColData->pBitMap, i, GET_BIT1(pColData->pBitMap, j));
SET_BIT1(pColData->pBitMap, j, bv);
} break;
case HAS_VALUE: {
tColDataSwapValue(pColData, i, j);
} break;
case (HAS_VALUE | HAS_NONE):
case (HAS_VALUE | HAS_NULL): {
uint8_t bv = GET_BIT1(pColData->pBitMap, i);
SET_BIT1(pColData->pBitMap, i, GET_BIT1(pColData->pBitMap, j));
SET_BIT1(pColData->pBitMap, j, bv);
tColDataSwapValue(pColData, i, j);
} break;
case (HAS_VALUE | HAS_NULL | HAS_NONE): {
uint8_t bv = GET_BIT2(pColData->pBitMap, i);
SET_BIT2(pColData->pBitMap, i, GET_BIT2(pColData->pBitMap, j));
SET_BIT2(pColData->pBitMap, j, bv);
tColDataSwapValue(pColData, i, j);
} break;
default:
ASSERT(0);
break;
}
}
#endif
static int32_t tColDataCopyRowCell(SColData *pFromColData, int32_t iFromRow, SColData *pToColData, int32_t iToRow) {
int32_t code = TSDB_CODE_SUCCESS;
@ -3170,11 +3111,27 @@ static int32_t tColDataCopyRowAppend(SColData *aFromColData, int32_t iFromRow, S
return code;
}
static FORCE_INLINE void tColDataArrGetRowKey(SColData *aColData, int32_t nColData, int32_t iRow, SRowKey *key) {
SColVal cv;
key->ts = ((TSKEY *)aColData[0].pData)[iRow];
key->numOfPKs = 0;
for (int i = 1; i < nColData; i++) {
if (aColData[i].cflag & COL_IS_KEY) {
ASSERT(aColData->flag == HAS_VALUE);
tColDataGetValue4(&aColData[i], iRow, &cv);
key->pks[key->numOfPKs++] = cv.value;
} else {
break;
}
}
}
static int32_t tColDataMergeSortMerge(SColData *aColData, int32_t start, int32_t mid, int32_t end, int32_t nColData) {
SColData *aDstColData = NULL;
TSKEY *aKey = (TSKEY *)aColData[0].pData;
int32_t i = start, j = mid + 1, k = 0;
int32_t i = start, j = mid + 1, k = 0;
SRowKey keyi, keyj;
if (end > start) {
aDstColData = taosMemoryCalloc(1, sizeof(SColData) * nColData);
@ -3184,30 +3141,25 @@ static int32_t tColDataMergeSortMerge(SColData *aColData, int32_t start, int32_t
if (aDstColData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
/*
for (int32_t i = 0; i < nColData; i++) {
tColDataCopy(&aColData[i], &aDstColData[i], tColDataDefaultMalloc, NULL);
}
*/
}
tColDataArrGetRowKey(aColData, nColData, i, &keyi);
tColDataArrGetRowKey(aColData, nColData, j, &keyj);
while (i <= mid && j <= end) {
if (aKey[i] <= aKey[j]) {
// tColDataCopyRow(aColData, i++, aDstColData, k++);
if (tRowKeyCompare(&keyi, &keyj) <= 0) {
tColDataCopyRowAppend(aColData, i++, aDstColData, nColData);
tColDataArrGetRowKey(aColData, nColData, i, &keyi);
} else {
// tColDataCopyRow(aColData, j++, aDstColData, k++);
tColDataCopyRowAppend(aColData, j++, aDstColData, nColData);
tColDataArrGetRowKey(aColData, nColData, j, &keyj);
}
}
while (i <= mid) {
// tColDataCopyRow(aColData, i++, aDstColData, k++);
tColDataCopyRowAppend(aColData, i++, aDstColData, nColData);
}
while (j <= end) {
// tColDataCopyRow(aColData, j++, aDstColData, k++);
tColDataCopyRowAppend(aColData, j++, aDstColData, nColData);
}
@ -3454,12 +3406,16 @@ static void tColDataMergeImpl(SColData *pColData, int32_t iStart, int32_t iEnd /
}
static void tColDataMerge(SColData *aColData, int32_t nColData) {
int32_t iStart = 0;
SRowKey keyStart, keyEnd;
for (;;) {
if (iStart >= aColData[0].nVal - 1) break;
tColDataArrGetRowKey(aColData, nColData, iStart, &keyStart);
int32_t iEnd = iStart + 1;
while (iEnd < aColData[0].nVal) {
if (((TSKEY *)aColData[0].pData)[iEnd] != ((TSKEY *)aColData[0].pData)[iStart]) break;
tColDataArrGetRowKey(aColData, nColData, iEnd, &keyEnd);
if (tRowKeyCompare(&keyStart, &keyEnd) != 0) break;
iEnd++;
}
@ -3473,6 +3429,7 @@ static void tColDataMerge(SColData *aColData, int32_t nColData) {
iStart++;
}
}
void tColDataSortMerge(SArray *colDataArr) {
int32_t nColData = TARRAY_SIZE(colDataArr);
SColData *aColData = (SColData *)TARRAY_DATA(colDataArr);
@ -3486,11 +3443,17 @@ void tColDataSortMerge(SArray *colDataArr) {
int8_t doSort = 0;
int8_t doMerge = 0;
// scan -------
TSKEY *aKey = (TSKEY *)aColData[0].pData;
SRowKey lastKey;
tColDataArrGetRowKey(aColData, nColData, 0, &lastKey);
for (int32_t iVal = 1; iVal < aColData[0].nVal; ++iVal) {
if (aKey[iVal] > aKey[iVal - 1]) {
SRowKey key;
tColDataArrGetRowKey(aColData, nColData, iVal, &key);
int32_t c = tRowKeyCompare(&lastKey, &key);
if (c < 0) {
lastKey = key;
continue;
} else if (aKey[iVal] < aKey[iVal - 1]) {
} else if (c > 0) {
doSort = 1;
break;
} else {
@ -3504,11 +3467,17 @@ void tColDataSortMerge(SArray *colDataArr) {
}
if (doMerge != 1) {
tColDataArrGetRowKey(aColData, nColData, 0, &lastKey);
for (int32_t iVal = 1; iVal < aColData[0].nVal; ++iVal) {
if (aKey[iVal] == aKey[iVal - 1]) {
SRowKey key;
tColDataArrGetRowKey(aColData, nColData, iVal, &key);
int32_t c = tRowKeyCompare(&lastKey, &key);
if (c == 0) {
doMerge = 1;
break;
}
lastKey = key;
}
}

View File

@ -664,10 +664,13 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
if (NULL != pLastCol) {
rocksdb_writebatch_delete(wb, keys_list[0], klen);
}
taosMemoryFreeClear(pLastCol);
pLastCol = tsdbCacheDeserialize(values_list[1]);
if (NULL != pLastCol) {
rocksdb_writebatch_delete(wb, keys_list[1], klen);
}
taosMemoryFreeClear(pLastCol);
rocksdb_free(values_list[0]);
rocksdb_free(values_list[1]);
@ -675,9 +678,7 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
bool erase = false;
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[0], klen);
if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
erase = true;
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
}
if (erase) {
@ -687,16 +688,12 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
erase = false;
h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[1], klen);
if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
erase = true;
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
}
if (erase) {
taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen);
}
taosMemoryFree(pLastCol);
}
taosMemoryFree(keys_list[0]);
@ -1705,13 +1702,16 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[i], klen);
}
taosMemoryFreeClear(pLastCol);
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
}
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
taosMemoryFreeClear(pLastCol);
taosMemoryFree(pLastCol);
rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]);

View File

@ -1678,11 +1678,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
__compar_fn_t compFn = pReader->pkComparFn;
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
SRowKey* pSttKey = &(SRowKey){0};
SRowKey* pSttKey = NULL;
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
} else {
pSttKey = NULL;
}
SRowKey k;
@ -1714,10 +1712,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
SRowKey minKey;
SRowKey minKey = k;
if (pReader->info.order == TSDB_ORDER_ASC) {
minKey = k; // chosen the minimum value
if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) < 0) {
minKey = *pfKey;
}
@ -1726,8 +1722,6 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = *pSttKey;
}
} else {
minKey = k;
if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) > 0) {
minKey = *pfKey;
}
@ -1882,11 +1876,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
SRowKey* pSttKey = &(SRowKey){0};
SRowKey* pSttKey = NULL;
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
tRowKeyAssign(pSttKey, getCurrentKeyInSttBlock(pSttBlockReader));
} else {
pSttKey = NULL;
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
}
SRowKey* pfKey = &(SRowKey){0};
@ -1925,10 +1917,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
SRowKey minKey = {0};
SRowKey minKey = k;
if (ASCENDING_TRAVERSE(pReader->info.order)) {
minKey = k; // let's find the minimum
if (pkCompEx(compFn, &ik, &minKey) < 0) { // minKey > ik.key.ts) {
minKey = ik;
}
@ -1941,7 +1931,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = *pSttKey;
}
} else {
minKey = k; // let find the maximum ts value
if (pkCompEx(compFn, &ik, &minKey) > 0) {
minKey = ik;
}
@ -1968,7 +1957,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
}
if (pkCompEx(compFn, &minKey, &pBlockScanInfo->lastProcKey) == 0) {
if (pkCompEx(compFn, &minKey, pSttKey) == 0) {
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -162,6 +162,7 @@ bool hasRemainResults(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode);
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo);
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId, SStorageAPI* pAPI);

View File

@ -189,8 +189,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
}
int64_t lastTs = TSKEY_MIN;
bool updateLastRow = false;
bool disorderTs = false;
bool needSortMerge = false;
for (int32_t j = 0; j < rows; ++j) { // iterate by row
taosArrayClear(pVals);
@ -258,11 +257,9 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
taosArrayPush(pVals, &cv);
} else {
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
if (*(int64_t*)var == lastTs) {
updateLastRow = true;
} else if (*(int64_t*)var < lastTs) {
disorderTs = true;
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) {
if (*(int64_t*)var <= lastTs) {
needSortMerge = true;
} else {
lastTs = *(int64_t*)var;
}
@ -287,17 +284,10 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
if (updateLastRow) {
updateLastRow = false;
SRow** lastRow = taosArrayPop(tbData.aRowP);
tRowDestroy(*lastRow);
taosArrayPush(tbData.aRowP, &pRow);
} else {
taosArrayPush(tbData.aRowP, &pRow);
}
taosArrayPush(tbData.aRowP, &pRow);
}
if (disorderTs) {
if (needSortMerge) {
if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) ||
(terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) {
goto _end;

View File

@ -250,6 +250,34 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
return pBlock;
}
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) {
SDataBlockInfo* pBlockInfo = &pDataBlock->info;
for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
if (pItem->isPk) {
SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
pBlockInfo->pks[0].type = pInfoData->info.type;
pBlockInfo->pks[1].type = pInfoData->info.type;
if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
if (pBlockInfo->pks[0].pData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
if (pBlockInfo->pks[1].pData == NULL) {
taosMemoryFreeClear(pBlockInfo->pks[0].pData);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
}
}
return TSDB_CODE_SUCCESS;
}
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
SMetaReader* mr = (SMetaReader*)pContext;
if (nodeType(*pNode) == QUERY_NODE_COLUMN) {

View File

@ -1196,23 +1196,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo);
{ // todo :refactor:
SDataBlockInfo* pBlockInfo = &pInfo->pResBlock->info;
for(int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) {
SColMatchItem* pItem = taosArrayGet(pInfo->base.matchInfo.pList, i);
if (pItem->isPk) {
SColumnInfoData* pInfoData = taosArrayGet(pInfo->pResBlock->pDataBlock, pItem->dstSlotId);
pBlockInfo->pks[0].type = pInfoData->info.type;
pBlockInfo->pks[1].type = pInfoData->info.type;
if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
}
}
}
}
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -4418,6 +4403,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->bSortRowId = false;
}
prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo);
pInfo->pSortInfo = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);

View File

@ -138,7 +138,7 @@ typedef struct SElapsedInfo {
typedef struct STwaInfo {
double dOutput;
bool isNull;
int64_t numOfElems;
SPoint1 p;
STimeWindow win;
} STwaInfo;
@ -600,10 +600,10 @@ bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
bool funcInputGetNextRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) {
SFuncInputRowIter* pIter = &pCtx->rowIter;
if (pCtx->hasPrimaryKey) {
if (pCtx->order == TSDB_ORDER_DESC) {
return funcInputGetNextRowDescPk(pIter, pRow);
} else {
if (pCtx->order == TSDB_ORDER_ASC) {
return funcInputGetNextRowAscPk(pIter, pRow);
} else {
return funcInputGetNextRowDescPk(pIter, pRow);
}
} else {
return funcInputGetNextRowNoPk(pIter, pRow);
@ -5556,7 +5556,7 @@ bool twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
}
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pInfo->isNull = false;
pInfo->numOfElems = 0;
pInfo->p.key = INT64_MIN;
pInfo->win = TSWINDOW_INITIALIZER;
return true;
@ -5581,13 +5581,11 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
SColumnInfoData* pInputCol = pInput->pData[0];
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SPoint1* last = &pInfo->p;
int32_t numOfElems = 0;
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SPoint1* last = &pInfo->p;
if (IS_NULL_TYPE(pInputCol->info.type)) {
pInfo->isNull = true;
pInfo->numOfElems = 0;
goto _twa_over;
}
@ -5605,7 +5603,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
pInfo->dOutput += twa_get_area(pCtx->start, *last);
pInfo->win.skey = pCtx->start.key;
numOfElems++;
pInfo->numOfElems++;
break;
}
} else if (pInfo->p.key == INT64_MIN) {
@ -5619,7 +5617,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
GET_TYPED_DATA(last->val, double, pInputCol->info.type, row.pData);
pInfo->win.skey = last->key;
numOfElems++;
pInfo->numOfElems++;
break;
}
}
@ -5633,7 +5631,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
if (row.isDataNull) {
continue;
}
numOfElems++;
pInfo->numOfElems++;
INIT_INTP_POINT(st, row.ts, *(int8_t*)row.pData);
if (pInfo->p.key == st.key) {
@ -5651,7 +5649,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
if (row.isDataNull) {
continue;
}
numOfElems++;
pInfo->numOfElems++;
INIT_INTP_POINT(st, row.ts, *(int16_t*)row.pData);
if (pInfo->p.key == st.key) {
@ -5668,7 +5666,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
if (row.isDataNull) {
continue;
}
numOfElems++;
pInfo->numOfElems++;
INIT_INTP_POINT(st, row.ts, *(int32_t*)row.pData);
if (pInfo->p.key == st.key) {
@ -5685,7 +5683,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
if (row.isDataNull) {
continue;
}
numOfElems++;
pInfo->numOfElems++;
INIT_INTP_POINT(st, row.ts, *(int64_t*)row.pData);
if (pInfo->p.key == st.key) {
@ -5702,7 +5700,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
if (row.isDataNull) {
continue;
}
numOfElems++;
pInfo->numOfElems++;
INIT_INTP_POINT(st, row.ts, *(float_t*)row.pData);
if (pInfo->p.key == st.key) {
@ -5719,7 +5717,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
if (row.isDataNull) {
continue;
}
numOfElems++;
pInfo->numOfElems++;
INIT_INTP_POINT(st, row.ts, *(double*)row.pData);
if (pInfo->p.key == st.key) {
@ -5736,7 +5734,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
if (row.isDataNull) {
continue;
}
numOfElems++;
pInfo->numOfElems++;
INIT_INTP_POINT(st, row.ts, *(uint8_t*)row.pData);
if (pInfo->p.key == st.key) {
@ -5753,7 +5751,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
if (row.isDataNull) {
continue;
}
numOfElems++;
pInfo->numOfElems++;
INIT_INTP_POINT(st, row.ts, *(uint16_t*)row.pData);
if (pInfo->p.key == st.key) {
@ -5770,7 +5768,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
if (row.isDataNull) {
continue;
}
numOfElems++;
pInfo->numOfElems++;
INIT_INTP_POINT(st, row.ts, *(uint32_t*)row.pData);
if (pInfo->p.key == st.key) {
@ -5787,7 +5785,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
if (row.isDataNull) {
continue;
}
numOfElems++;
pInfo->numOfElems++;
INIT_INTP_POINT(st, row.ts, *(uint64_t*)row.pData);
if (pInfo->p.key == st.key) {
@ -5808,16 +5806,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
if (pCtx->end.key != INT64_MIN) {
pInfo->dOutput += twa_get_area(pInfo->p, pCtx->end);
pInfo->p = pCtx->end;
numOfElems += 1;
pInfo->numOfElems += 1;
}
pInfo->win.ekey = pInfo->p.key;
_twa_over:
if (numOfElems == 0) {
pInfo->isNull = true;
}
SET_VAL(pResInfo, 1, 1);
return TSDB_CODE_SUCCESS;
}
@ -5838,7 +5832,7 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STwaInfo* pInfo = (STwaInfo*)GET_ROWCELL_INTERBUF(pResInfo);
if (pInfo->isNull == true) {
if (pInfo->numOfElems == 0) {
pResInfo->numOfRes = 0;
} else {
if (pInfo->win.ekey == pInfo->win.skey) {

View File

@ -1392,7 +1392,7 @@ int32_t initTableColSubmitData(STableDataCxt* pTableCxt) {
if (NULL == pCol) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tColDataInit(pCol, pSchema->colId, pSchema->type, 0);
tColDataInit(pCol, pSchema->colId, pSchema->type, pSchema->flags);
}
return TSDB_CODE_SUCCESS;

View File

@ -3390,8 +3390,6 @@ class TDTestCase:
tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')")
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:14') every(1s) fill(null)")
tdSql.error(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:15') every(1s) fill(null)")
tdSql.error(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)")
tdLog.printNoPrefix("======step 14: test interp ignore null values")