diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index e748cce643..69b2a2e6a3 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -535,8 +535,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b if (asc) { if (IS_NUMERIC_TYPE(pColInfoData->info.type)) { - pDataBlock->info.pks[0].val = *(int32_t*) skey; - pDataBlock->info.pks[1].val = *(int32_t*) ekey; + GET_TYPED_DATA(pDataBlock->info.pks[0].val, int64_t, pColInfoData->info.type, skey); + GET_TYPED_DATA(pDataBlock->info.pks[1].val, int64_t, pColInfoData->info.type, ekey); } else { // todo refactor memcpy(pDataBlock->info.pks[0].pData, varDataVal(skey), varDataLen(skey)); pDataBlock->info.pks[0].nData = varDataLen(skey); @@ -546,8 +546,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b } } else { if (IS_NUMERIC_TYPE(pColInfoData->info.type)) { - pDataBlock->info.pks[0].val = *(int32_t*) ekey; - pDataBlock->info.pks[1].val = *(int32_t*) skey; + GET_TYPED_DATA(pDataBlock->info.pks[0].val, int64_t, pColInfoData->info.type, ekey); + GET_TYPED_DATA(pDataBlock->info.pks[1].val, int64_t, pColInfoData->info.type, skey); } else { // todo refactor memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey)); pDataBlock->info.pks[0].nData = varDataLen(ekey); @@ -1491,6 +1491,18 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { blockDataAppendColInfo(pBlock, &colInfo); } + // prepare the pk buffer if necessary + if (IS_VAR_DATA_TYPE(pDataBlock->info.pks[0].type)) { + SValue* pVal = &pBlock->info.pks[0]; + + pVal->type = pDataBlock->info.pks[0].type; + pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData); + + pVal = &pBlock->info.pks[1]; + pVal->type = pDataBlock->info.pks[1].type; + pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); + } + if (copyData) { int32_t code = blockDataEnsureCapacity(pBlock, pDataBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 5d1694ab90..1585c12ac1 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -593,12 +593,16 @@ static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart, for (int32_t iCol = 0; iCol < pTSchema->numOfCols; iCol++) { SColVal *pColVal = NULL; - for (int32_t iRow = 0; iRow < nRow; iRow++) { + for (int32_t iRow = nRow - 1; iRow >= 0; --iRow) { SColVal *pColValT = tRowIterNext(aIter[iRow]); + while (pColValT->cid < pTSchema->columns[iCol].colId) { + pColValT = tRowIterNext(aIter[iRow]); + } // todo: take strategy according to the flag if (COL_VAL_IS_VALUE(pColValT)) { pColVal = pColValT; + break; } else if (COL_VAL_IS_NULL(pColValT)) { if (pColVal == NULL) { pColVal = pColValT; @@ -2880,8 +2884,12 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt char *data) { int32_t code = 0; if (data == NULL) { - for (int32_t i = 0; i < nRows; ++i) { - code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); + if (pColData->cflag & COL_IS_KEY) { + code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; + } else { + for (int32_t i = 0; i < nRows; ++i) { + code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); + } } goto _exit; } @@ -2890,8 +2898,13 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt for (int32_t i = 0; i < nRows; ++i) { int32_t offset = *((int32_t *)lengthOrbitmap + i); if (offset == -1) { - code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); - if (code) goto _exit; + if (pColData->cflag & COL_IS_KEY) { + code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; + goto _exit; + } + if ((code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0))) { + goto _exit; + } } else { if (varDataTLen(data + offset) > bytes) { uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), @@ -2913,6 +2926,10 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt allValue = false; } } + if ((pColData->cflag & COL_IS_KEY) && !allValue) { + code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; + goto _exit; + } if (allValue) { // optimize (todo) @@ -2951,6 +2968,10 @@ int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind, int32 if (IS_VAR_DATA_TYPE(pColData->type)) { // var-length data type for (int32_t i = 0; i < pBind->num; ++i) { if (pBind->is_null && pBind->is_null[i]) { + if (pColData->cflag & COL_IS_KEY) { + code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; + goto _exit; + } code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); if (code) goto _exit; } else if (pBind->length[i] > buffMaxLen) { @@ -2973,6 +2994,11 @@ int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind, int32 allValue = true; } + if ((pColData->cflag & COL_IS_KEY) && !allValue) { + code = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; + goto _exit; + } + if (allValue) { // optimize (todo) for (int32_t i = 0; i < pBind->num; ++i) { @@ -3002,91 +3028,6 @@ _exit: return code; } -#ifdef BUILD_NO_CALL -static int32_t tColDataSwapValue(SColData *pColData, int32_t i, int32_t j) { - int32_t code = 0; - - if (IS_VAR_DATA_TYPE(pColData->type)) { - int32_t nData1 = pColData->aOffset[i + 1] - pColData->aOffset[i]; - int32_t nData2 = (j < pColData->nVal - 1) ? pColData->aOffset[j + 1] - pColData->aOffset[j] - : pColData->nData - pColData->aOffset[j]; - uint8_t *pData = taosMemoryMalloc(TMAX(nData1, nData2)); - if (pData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - if (nData1 > nData2) { - memcpy(pData, pColData->pData + pColData->aOffset[i], nData1); - memcpy(pColData->pData + pColData->aOffset[i], pColData->pData + pColData->aOffset[j], nData2); - // memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i] + nData1, - // pColData->aOffset[j] - pColData->aOffset[i + 1]); - memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i + 1], - pColData->aOffset[j] - pColData->aOffset[i + 1]); - memcpy(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pData, nData1); - } else { - memcpy(pData, pColData->pData + pColData->aOffset[j], nData2); - memcpy(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pColData->pData + pColData->aOffset[i], nData1); - // memmove(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pColData->pData + pColData->aOffset[i] + - // nData1, - // pColData->aOffset[j] - pColData->aOffset[i + 1]); - memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i + 1], - pColData->aOffset[j] - pColData->aOffset[i + 1]); - memcpy(pColData->pData + pColData->aOffset[i], pData, nData2); - } - for (int32_t k = i + 1; k <= j; ++k) { - pColData->aOffset[k] = pColData->aOffset[k] + nData2 - nData1; - } - - taosMemoryFree(pData); - } else { - uint64_t val; - memcpy(&val, &pColData->pData[TYPE_BYTES[pColData->type] * i], TYPE_BYTES[pColData->type]); - memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * i], &pColData->pData[TYPE_BYTES[pColData->type] * j], - TYPE_BYTES[pColData->type]); - memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * j], &val, TYPE_BYTES[pColData->type]); - } - -_exit: - return code; -} - -static void tColDataSwap(SColData *pColData, int32_t i, int32_t j) { - ASSERT(i < j); - ASSERT(j < pColData->nVal); - - switch (pColData->flag) { - case HAS_NONE: - case HAS_NULL: - break; - case (HAS_NULL | HAS_NONE): { - uint8_t bv = GET_BIT1(pColData->pBitMap, i); - SET_BIT1(pColData->pBitMap, i, GET_BIT1(pColData->pBitMap, j)); - SET_BIT1(pColData->pBitMap, j, bv); - } break; - case HAS_VALUE: { - tColDataSwapValue(pColData, i, j); - } break; - case (HAS_VALUE | HAS_NONE): - case (HAS_VALUE | HAS_NULL): { - uint8_t bv = GET_BIT1(pColData->pBitMap, i); - SET_BIT1(pColData->pBitMap, i, GET_BIT1(pColData->pBitMap, j)); - SET_BIT1(pColData->pBitMap, j, bv); - tColDataSwapValue(pColData, i, j); - } break; - case (HAS_VALUE | HAS_NULL | HAS_NONE): { - uint8_t bv = GET_BIT2(pColData->pBitMap, i); - SET_BIT2(pColData->pBitMap, i, GET_BIT2(pColData->pBitMap, j)); - SET_BIT2(pColData->pBitMap, j, bv); - tColDataSwapValue(pColData, i, j); - } break; - default: - ASSERT(0); - break; - } -} -#endif - static int32_t tColDataCopyRowCell(SColData *pFromColData, int32_t iFromRow, SColData *pToColData, int32_t iToRow) { int32_t code = TSDB_CODE_SUCCESS; @@ -3170,11 +3111,27 @@ static int32_t tColDataCopyRowAppend(SColData *aFromColData, int32_t iFromRow, S return code; } +static FORCE_INLINE void tColDataArrGetRowKey(SColData *aColData, int32_t nColData, int32_t iRow, SRowKey *key) { + SColVal cv; + + key->ts = ((TSKEY *)aColData[0].pData)[iRow]; + key->numOfPKs = 0; + + for (int i = 1; i < nColData; i++) { + if (aColData[i].cflag & COL_IS_KEY) { + ASSERT(aColData->flag == HAS_VALUE); + tColDataGetValue4(&aColData[i], iRow, &cv); + key->pks[key->numOfPKs++] = cv.value; + } else { + break; + } + } +} + static int32_t tColDataMergeSortMerge(SColData *aColData, int32_t start, int32_t mid, int32_t end, int32_t nColData) { SColData *aDstColData = NULL; - TSKEY *aKey = (TSKEY *)aColData[0].pData; - - int32_t i = start, j = mid + 1, k = 0; + int32_t i = start, j = mid + 1, k = 0; + SRowKey keyi, keyj; if (end > start) { aDstColData = taosMemoryCalloc(1, sizeof(SColData) * nColData); @@ -3184,30 +3141,25 @@ static int32_t tColDataMergeSortMerge(SColData *aColData, int32_t start, int32_t if (aDstColData == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - /* - for (int32_t i = 0; i < nColData; i++) { - tColDataCopy(&aColData[i], &aDstColData[i], tColDataDefaultMalloc, NULL); - } - */ } + tColDataArrGetRowKey(aColData, nColData, i, &keyi); + tColDataArrGetRowKey(aColData, nColData, j, &keyj); while (i <= mid && j <= end) { - if (aKey[i] <= aKey[j]) { - // tColDataCopyRow(aColData, i++, aDstColData, k++); + if (tRowKeyCompare(&keyi, &keyj) <= 0) { tColDataCopyRowAppend(aColData, i++, aDstColData, nColData); + tColDataArrGetRowKey(aColData, nColData, i, &keyi); } else { - // tColDataCopyRow(aColData, j++, aDstColData, k++); tColDataCopyRowAppend(aColData, j++, aDstColData, nColData); + tColDataArrGetRowKey(aColData, nColData, j, &keyj); } } while (i <= mid) { - // tColDataCopyRow(aColData, i++, aDstColData, k++); tColDataCopyRowAppend(aColData, i++, aDstColData, nColData); } while (j <= end) { - // tColDataCopyRow(aColData, j++, aDstColData, k++); tColDataCopyRowAppend(aColData, j++, aDstColData, nColData); } @@ -3454,12 +3406,16 @@ static void tColDataMergeImpl(SColData *pColData, int32_t iStart, int32_t iEnd / } static void tColDataMerge(SColData *aColData, int32_t nColData) { int32_t iStart = 0; + SRowKey keyStart, keyEnd; + for (;;) { if (iStart >= aColData[0].nVal - 1) break; + tColDataArrGetRowKey(aColData, nColData, iStart, &keyStart); int32_t iEnd = iStart + 1; while (iEnd < aColData[0].nVal) { - if (((TSKEY *)aColData[0].pData)[iEnd] != ((TSKEY *)aColData[0].pData)[iStart]) break; + tColDataArrGetRowKey(aColData, nColData, iEnd, &keyEnd); + if (tRowKeyCompare(&keyStart, &keyEnd) != 0) break; iEnd++; } @@ -3473,6 +3429,7 @@ static void tColDataMerge(SColData *aColData, int32_t nColData) { iStart++; } } + void tColDataSortMerge(SArray *colDataArr) { int32_t nColData = TARRAY_SIZE(colDataArr); SColData *aColData = (SColData *)TARRAY_DATA(colDataArr); @@ -3486,11 +3443,17 @@ void tColDataSortMerge(SArray *colDataArr) { int8_t doSort = 0; int8_t doMerge = 0; // scan ------- - TSKEY *aKey = (TSKEY *)aColData[0].pData; + SRowKey lastKey; + tColDataArrGetRowKey(aColData, nColData, 0, &lastKey); for (int32_t iVal = 1; iVal < aColData[0].nVal; ++iVal) { - if (aKey[iVal] > aKey[iVal - 1]) { + SRowKey key; + tColDataArrGetRowKey(aColData, nColData, iVal, &key); + + int32_t c = tRowKeyCompare(&lastKey, &key); + if (c < 0) { + lastKey = key; continue; - } else if (aKey[iVal] < aKey[iVal - 1]) { + } else if (c > 0) { doSort = 1; break; } else { @@ -3504,11 +3467,17 @@ void tColDataSortMerge(SArray *colDataArr) { } if (doMerge != 1) { + tColDataArrGetRowKey(aColData, nColData, 0, &lastKey); for (int32_t iVal = 1; iVal < aColData[0].nVal; ++iVal) { - if (aKey[iVal] == aKey[iVal - 1]) { + SRowKey key; + tColDataArrGetRowKey(aColData, nColData, iVal, &key); + + int32_t c = tRowKeyCompare(&lastKey, &key); + if (c == 0) { doMerge = 1; break; } + lastKey = key; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index ffb24fbc0e..3a178f7ade 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -664,10 +664,13 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, if (NULL != pLastCol) { rocksdb_writebatch_delete(wb, keys_list[0], klen); } + taosMemoryFreeClear(pLastCol); + pLastCol = tsdbCacheDeserialize(values_list[1]); if (NULL != pLastCol) { rocksdb_writebatch_delete(wb, keys_list[1], klen); } + taosMemoryFreeClear(pLastCol); rocksdb_free(values_list[0]); rocksdb_free(values_list[1]); @@ -675,9 +678,7 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool erase = false; LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[0], klen); if (h) { - SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h); erase = true; - taosLRUCacheRelease(pTsdb->lruCache, h, erase); } if (erase) { @@ -687,16 +688,12 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, erase = false; h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[1], klen); if (h) { - SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h); erase = true; - taosLRUCacheRelease(pTsdb->lruCache, h, erase); } if (erase) { taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen); } - - taosMemoryFree(pLastCol); } taosMemoryFree(keys_list[0]); @@ -1705,13 +1702,16 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[i], klen); } + + taosMemoryFreeClear(pLastCol); + pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) { rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); } taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + taosMemoryFreeClear(pLastCol); - taosMemoryFree(pLastCol); rocksdb_free(values_list[i]); rocksdb_free(values_list[i + num_keys]); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 9fcc10e396..a8a4ced517 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1678,11 +1678,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* __compar_fn_t compFn = pReader->pkComparFn; int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; - SRowKey* pSttKey = &(SRowKey){0}; + SRowKey* pSttKey = NULL; if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); - } else { - pSttKey = NULL; } SRowKey k; @@ -1714,10 +1712,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } - SRowKey minKey; + SRowKey minKey = k; if (pReader->info.order == TSDB_ORDER_ASC) { - minKey = k; // chosen the minimum value - if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) < 0) { minKey = *pfKey; } @@ -1726,8 +1722,6 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* minKey = *pSttKey; } } else { - minKey = k; - if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) > 0) { minKey = *pfKey; } @@ -1882,11 +1876,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); - SRowKey* pSttKey = &(SRowKey){0}; + SRowKey* pSttKey = NULL; if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { - tRowKeyAssign(pSttKey, getCurrentKeyInSttBlock(pSttBlockReader)); - } else { - pSttKey = NULL; + pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); } SRowKey* pfKey = &(SRowKey){0}; @@ -1925,10 +1917,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - SRowKey minKey = {0}; + SRowKey minKey = k; if (ASCENDING_TRAVERSE(pReader->info.order)) { - minKey = k; // let's find the minimum - if (pkCompEx(compFn, &ik, &minKey) < 0) { // minKey > ik.key.ts) { minKey = ik; } @@ -1941,7 +1931,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = *pSttKey; } } else { - minKey = k; // let find the maximum ts value if (pkCompEx(compFn, &ik, &minKey) > 0) { minKey = ik; } @@ -1968,7 +1957,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); } - if (pkCompEx(compFn, &minKey, &pBlockScanInfo->lastProcKey) == 0) { + if (pkCompEx(compFn, &minKey, pSttKey) == 0) { TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); code = tsdbRowMergerAdd(pMerger, pRow1, NULL); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 1c70fd8bb6..55b803f6d4 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -162,6 +162,7 @@ bool hasRemainResults(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode); +int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId, SStorageAPI* pAPI); diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index dfe30c9f96..06f63f5f04 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -189,8 +189,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp } int64_t lastTs = TSKEY_MIN; - bool updateLastRow = false; - bool disorderTs = false; + bool needSortMerge = false; for (int32_t j = 0; j < rows; ++j) { // iterate by row taosArrayClear(pVals); @@ -258,11 +257,9 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type taosArrayPush(pVals, &cv); } else { - if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) { - if (*(int64_t*)var == lastTs) { - updateLastRow = true; - } else if (*(int64_t*)var < lastTs) { - disorderTs = true; + if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && !needSortMerge) { + if (*(int64_t*)var <= lastTs) { + needSortMerge = true; } else { lastTs = *(int64_t*)var; } @@ -287,17 +284,10 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); goto _end; } - if (updateLastRow) { - updateLastRow = false; - SRow** lastRow = taosArrayPop(tbData.aRowP); - tRowDestroy(*lastRow); - taosArrayPush(tbData.aRowP, &pRow); - } else { - taosArrayPush(tbData.aRowP, &pRow); - } + taosArrayPush(tbData.aRowP, &pRow); } - if (disorderTs) { + if (needSortMerge) { if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) || (terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) { goto _end; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 97b9b00efb..9749dffc13 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -250,6 +250,34 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { return pBlock; } +int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) { + SDataBlockInfo* pBlockInfo = &pDataBlock->info; + + for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) { + SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i); + if (pItem->isPk) { + SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId); + pBlockInfo->pks[0].type = pInfoData->info.type; + pBlockInfo->pks[1].type = pInfoData->info.type; + + if (IS_VAR_DATA_TYPE(pItem->dataType.type)) { + pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes); + if (pBlockInfo->pks[0].pData == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes); + if (pBlockInfo->pks[1].pData == NULL) { + taosMemoryFreeClear(pBlockInfo->pks[0].pData); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + } + } + + return TSDB_CODE_SUCCESS; +} + EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { SMetaReader* mr = (SMetaReader*)pContext; if (nodeType(*pNode) == QUERY_NODE_COLUMN) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a51e627272..0d4c536dec 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1196,23 +1196,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader; initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); + prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo); - { // todo :refactor: - SDataBlockInfo* pBlockInfo = &pInfo->pResBlock->info; - for(int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) { - SColMatchItem* pItem = taosArrayGet(pInfo->base.matchInfo.pList, i); - if (pItem->isPk) { - SColumnInfoData* pInfoData = taosArrayGet(pInfo->pResBlock->pDataBlock, pItem->dstSlotId); - pBlockInfo->pks[0].type = pInfoData->info.type; - pBlockInfo->pks[1].type = pInfoData->info.type; - - if (IS_VAR_DATA_TYPE(pItem->dataType.type)) { - pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes); - pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes); - } - } - } - } code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4418,6 +4403,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->bSortRowId = false; } + prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo); + pInfo->pSortInfo = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index f779c1fa25..b88e1474d7 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -138,7 +138,7 @@ typedef struct SElapsedInfo { typedef struct STwaInfo { double dOutput; - bool isNull; + int64_t numOfElems; SPoint1 p; STimeWindow win; } STwaInfo; @@ -600,10 +600,10 @@ bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { bool funcInputGetNextRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) { SFuncInputRowIter* pIter = &pCtx->rowIter; if (pCtx->hasPrimaryKey) { - if (pCtx->order == TSDB_ORDER_DESC) { - return funcInputGetNextRowDescPk(pIter, pRow); - } else { + if (pCtx->order == TSDB_ORDER_ASC) { return funcInputGetNextRowAscPk(pIter, pRow); + } else { + return funcInputGetNextRowDescPk(pIter, pRow); } } else { return funcInputGetNextRowNoPk(pIter, pRow); @@ -5556,7 +5556,7 @@ bool twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { } STwaInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - pInfo->isNull = false; + pInfo->numOfElems = 0; pInfo->p.key = INT64_MIN; pInfo->win = TSWINDOW_INITIALIZER; return true; @@ -5581,13 +5581,11 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pInputCol = pInput->pData[0]; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - - STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); - SPoint1* last = &pInfo->p; - int32_t numOfElems = 0; + STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + SPoint1* last = &pInfo->p; if (IS_NULL_TYPE(pInputCol->info.type)) { - pInfo->isNull = true; + pInfo->numOfElems = 0; goto _twa_over; } @@ -5605,7 +5603,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { pInfo->dOutput += twa_get_area(pCtx->start, *last); pInfo->win.skey = pCtx->start.key; - numOfElems++; + pInfo->numOfElems++; break; } } else if (pInfo->p.key == INT64_MIN) { @@ -5619,7 +5617,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { GET_TYPED_DATA(last->val, double, pInputCol->info.type, row.pData); pInfo->win.skey = last->key; - numOfElems++; + pInfo->numOfElems++; break; } } @@ -5633,7 +5631,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (row.isDataNull) { continue; } - numOfElems++; + pInfo->numOfElems++; INIT_INTP_POINT(st, row.ts, *(int8_t*)row.pData); if (pInfo->p.key == st.key) { @@ -5651,7 +5649,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (row.isDataNull) { continue; } - numOfElems++; + pInfo->numOfElems++; INIT_INTP_POINT(st, row.ts, *(int16_t*)row.pData); if (pInfo->p.key == st.key) { @@ -5668,7 +5666,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (row.isDataNull) { continue; } - numOfElems++; + pInfo->numOfElems++; INIT_INTP_POINT(st, row.ts, *(int32_t*)row.pData); if (pInfo->p.key == st.key) { @@ -5685,7 +5683,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (row.isDataNull) { continue; } - numOfElems++; + pInfo->numOfElems++; INIT_INTP_POINT(st, row.ts, *(int64_t*)row.pData); if (pInfo->p.key == st.key) { @@ -5702,7 +5700,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (row.isDataNull) { continue; } - numOfElems++; + pInfo->numOfElems++; INIT_INTP_POINT(st, row.ts, *(float_t*)row.pData); if (pInfo->p.key == st.key) { @@ -5719,7 +5717,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (row.isDataNull) { continue; } - numOfElems++; + pInfo->numOfElems++; INIT_INTP_POINT(st, row.ts, *(double*)row.pData); if (pInfo->p.key == st.key) { @@ -5736,7 +5734,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (row.isDataNull) { continue; } - numOfElems++; + pInfo->numOfElems++; INIT_INTP_POINT(st, row.ts, *(uint8_t*)row.pData); if (pInfo->p.key == st.key) { @@ -5753,7 +5751,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (row.isDataNull) { continue; } - numOfElems++; + pInfo->numOfElems++; INIT_INTP_POINT(st, row.ts, *(uint16_t*)row.pData); if (pInfo->p.key == st.key) { @@ -5770,7 +5768,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (row.isDataNull) { continue; } - numOfElems++; + pInfo->numOfElems++; INIT_INTP_POINT(st, row.ts, *(uint32_t*)row.pData); if (pInfo->p.key == st.key) { @@ -5787,7 +5785,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (row.isDataNull) { continue; } - numOfElems++; + pInfo->numOfElems++; INIT_INTP_POINT(st, row.ts, *(uint64_t*)row.pData); if (pInfo->p.key == st.key) { @@ -5808,16 +5806,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { if (pCtx->end.key != INT64_MIN) { pInfo->dOutput += twa_get_area(pInfo->p, pCtx->end); pInfo->p = pCtx->end; - numOfElems += 1; + pInfo->numOfElems += 1; } pInfo->win.ekey = pInfo->p.key; _twa_over: - if (numOfElems == 0) { - pInfo->isNull = true; - } - SET_VAL(pResInfo, 1, 1); return TSDB_CODE_SUCCESS; } @@ -5838,7 +5832,7 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); STwaInfo* pInfo = (STwaInfo*)GET_ROWCELL_INTERBUF(pResInfo); - if (pInfo->isNull == true) { + if (pInfo->numOfElems == 0) { pResInfo->numOfRes = 0; } else { if (pInfo->win.ekey == pInfo->win.skey) { diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 71d832dc1f..bbcd27bd65 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1392,7 +1392,7 @@ int32_t initTableColSubmitData(STableDataCxt* pTableCxt) { if (NULL == pCol) { return TSDB_CODE_OUT_OF_MEMORY; } - tColDataInit(pCol, pSchema->colId, pSchema->type, 0); + tColDataInit(pCol, pSchema->colId, pSchema->type, pSchema->flags); } return TSDB_CODE_SUCCESS; diff --git a/tests/system-test/2-query/interp.py b/tests/system-test/2-query/interp.py index 86d010209d..81c5b66185 100644 --- a/tests/system-test/2-query/interp.py +++ b/tests/system-test/2-query/interp.py @@ -3390,8 +3390,6 @@ class TDTestCase: tdSql.execute(f"insert into {dbname}.{ctbname1} values ('2020-02-01 00:00:15', 15, 15, 15, 15, 15.0, 15.0, true, 'varchar', 'nchar')") tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:14') every(1s) fill(null)") - tdSql.error(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:15') every(1s) fill(null)") - tdSql.error(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)") tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname} partition by tbname range('2020-02-01 00:00:00', '2020-02-01 00:00:18') every(1s) fill(null)") tdLog.printNoPrefix("======step 14: test interp ignore null values")