Merge branch 'feat/TS-4243-3.0' into enh/TD-28945-pk

This commit is contained in:
Minglei Jin 2024-04-10 09:09:51 +08:00
commit d1cb05bba0
5 changed files with 119 additions and 37 deletions

View File

@ -896,6 +896,7 @@ typedef enum {
} EExecMode;
typedef struct {
int64_t version;
SRowKey rowKey;
int8_t dirty;
SColVal colVal;

View File

@ -570,20 +570,35 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
taosArrayClear(pVals);
int32_t dataIndex = 0;
int64_t ts = 0;
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pCol = &pTSchema->columns[k];
// primary timestamp column, for debug purpose
if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData);
ts = *(int64_t*)colDataGetData(pColData, j);
tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts);
}
if (IS_SET_NULL(pCol)) {
if (pCol->flags & COL_IS_KEY) {
qError("ts:%" PRId64 " Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts,
pCol->colId, pCol->type);
break;
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
if (colDataIsNull_s(pColData, j)) {
if (pCol->flags & COL_IS_KEY) {
qError("ts:%" PRId64 "Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
ts, pCol->colId, pCol->type);
break;
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
dataIndex++;
@ -607,6 +622,8 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
}
SRow* pRow = NULL;
tqInfo("result column flag:%d", pTSchema->columns[1].flags);
code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
if (code != TSDB_CODE_SUCCESS) {
tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);

View File

@ -339,6 +339,7 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
static SLastCol *tsdbCacheConvertLastColV1(SLastColV1 *pLastColV1) {
SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
if (pLastCol == NULL) return NULL;
pLastCol->version = LAST_COL_VERSION;
pLastCol->rowKey.ts = pLastColV1->ts;
pLastCol->rowKey.numOfPKs = 0;
pLastCol->dirty = pLastColV1->dirty;
@ -408,7 +409,7 @@ static SLastCol *tsdbCacheDeserialize(char const *value) {
if (!hasVersion) {
return tsdbCacheDeserializeV1(value);
}
return tsdbCacheDeserializeV2(value + sizeof(int64_t));
return tsdbCacheDeserializeV2(value);
}
static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) {
@ -423,7 +424,7 @@ static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) {
static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
SColVal *pColVal = &pLastCol->colVal;
size_t length = sizeof(int64_t) + sizeof(*pLastCol);
size_t length = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
length += pLastCol->rowKey.pks[i].nData;
@ -435,14 +436,11 @@ static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
// set version
*value = taosMemoryMalloc(length);
char *currentPos = *value;
*(int64_t *)currentPos = LAST_COL_VERSION;
currentPos += sizeof(int64_t);
// copy last col
SLastCol* pToLastCol = (SLastCol *)currentPos;
SLastCol* pToLastCol = (SLastCol *)(*value);
*pToLastCol = *pLastCol;
currentPos += sizeof(*pLastCol);
char *currentPos = *value + sizeof(*pLastCol);
// copy var data pks
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
@ -535,18 +533,22 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
return code;
}
static void reallocVarData(SColVal *pColVal) {
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
uint8_t *pVal = pColVal->value.pData;
if (pColVal->value.nData > 0) {
pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData);
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
static void reallocVarDataVal(SValue *pValue) {
if (IS_VAR_DATA_TYPE(pValue->type)) {
uint8_t *pVal = pValue->pData;
if (pValue->nData > 0) {
pValue->pData = taosMemoryMalloc(pValue->nData);
memcpy(pValue->pData, pVal, pValue->nData);
} else {
pColVal->value.pData = NULL;
pValue->pData = NULL;
}
}
}
static void reallocVarData(SColVal *pColVal) {
reallocVarDataVal(&pColVal->value);
}
static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud) {
SLastCol *pLastCol = (SLastCol *)value;
@ -569,16 +571,26 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
SRowKey noneRowKey = {0};
noneRowKey.ts = TSKEY_MIN;
noneRowKey.numOfPKs = 0;
SLastCol noneCol = {.rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1};
SLastCol noneCol = {
.version = LAST_COL_VERSION, .rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1};
SLastCol *pLastCol = &noneCol;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
reallocVarDataVal(pValue);
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
reallocVarData(&pLastCol->colVal);
charge += pLastCol->colVal.value.nData;
}
@ -923,6 +935,7 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal
int nData = 0;
// update rowkey
pLastCol->version = LAST_COL_VERSION;
pLastCol->rowKey.ts = pRowKey->ts;
pLastCol->rowKey.numOfPKs = pRowKey->numOfPKs;
for (int8_t i = 0; i < pRowKey->numOfPKs; i++) {
@ -1015,7 +1028,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) {
tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
}
@ -1032,7 +1044,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) {
tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
}
@ -1078,6 +1089,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
// SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx);
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
SLastCol *PToFree = pLastCol;
if (IS_LAST_ROW_KEY(idxKey->key)) {
if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) {
@ -1096,9 +1108,17 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
reallocVarDataVal(pValue);
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
reallocVarData(&pLastCol->colVal);
charge += pLastCol->colVal.value.nData;
}
@ -1128,9 +1148,17 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
reallocVarDataVal(pValue);
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
reallocVarData(&pLastCol->colVal);
charge += pLastCol->colVal.value.nData;
}
@ -1143,10 +1171,9 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(value);
}
}
taosMemoryFree(pLastCol);
}
taosMemoryFreeClear(PToFree);
rocksdb_free(values_list[i]);
}
@ -1428,7 +1455,8 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
}
// still null, then make up a none col value
SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
SLastCol noneCol = {.version = LAST_COL_VERSION,
.rowKey.ts = TSKEY_MIN,
.colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)};
if (!pLastCol) {
pLastCol = &noneCol;
@ -1446,9 +1474,16 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
reallocVarDataVal(pValue);
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
reallocVarData(&pLastCol->colVal);
charge += pLastCol->colVal.value.nData;
}
@ -1524,15 +1559,23 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
SLRUCache *pCache = pTsdb->lruCache;
for (int i = 0, j = 0; i < num_keys && j < TARRAY_SIZE(remainCols); ++i) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
SLastCol* PToFree = pLastCol;
SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[j];
if (pLastCol) {
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
reallocVarDataVal(pValue);
charge += pValue->nData;
}
}
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
reallocVarData(&pLastCol->colVal);
charge += pLastCol->colVal.value.nData;
}
@ -1543,11 +1586,14 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
}
SLastCol lastCol = *pLastCol;
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
reallocVarDataVal(&lastCol.rowKey.pks[i]);
}
reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, &lastCol);
taosArrayRemove(remainCols, j);
taosMemoryFree(pLastCol);
taosMemoryFreeClear(PToFree);
taosMemoryFree(values_list[i]);
} else {
++j;
@ -1591,12 +1637,17 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
reallocVarDataVal(&lastCol.rowKey.pks[i]);
}
reallocVarData(&lastCol.colVal);
taosArrayPush(pLastArray, &lastCol);
taosLRUCacheRelease(pCache, h, false);
} else {
SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
SLastCol noneCol = {.version = LAST_COL_VERSION,
.rowKey.ts = TSKEY_MIN,
.colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
taosArrayPush(pLastArray, &noneCol);
@ -1616,6 +1667,9 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
for (int8_t i = 0; i < lastCol.rowKey.numOfPKs; i++) {
reallocVarDataVal(&lastCol.rowKey.pks[i]);
}
reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, &lastCol);
@ -1702,7 +1756,6 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[i], klen);
}
taosMemoryFreeClear(pLastCol);
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
@ -3232,7 +3285,9 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray,
for (int32_t i = 0; i < nCols; ++i) {
int16_t slotId = slotIds[i];
SLastCol col = {.rowKey.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
SLastCol col = {.version = LAST_COL_VERSION,
.rowKey.ts = 0,
.colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
taosArrayPush(pColArray, &col);
}
*ppColArray = pColArray;
@ -3337,12 +3392,12 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal});
taosArraySet(pColArray, 0, &(SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal});
continue;
}
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal};
*pCol = (SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
if (pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
@ -3392,7 +3447,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
SLastCol lastCol = {.rowKey.ts = rowTs, .colVal = *pColVal};
SLastCol lastCol = {.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
taosMemoryFree(pLastCol->colVal.value.pData);
@ -3516,12 +3571,12 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal});
taosArraySet(pColArray, 0, &(SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal});
continue;
}
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal};
*pCol = (SLastCol){.version = LAST_COL_VERSION, .rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
if (pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);

View File

@ -2997,6 +2997,15 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
if (FUNCTION_TYPE_LAST == funcType) {
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt);
nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1));
if (pFunc->hasPk) {
if (LIST_LENGTH(pFunc->pParameterList) != 2) {
planError("last func which has pk but its parameter list length is not 2");
nodesClearList(cxt.pLastCols);
taosArrayDestroy(isDuplicateCol);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 1), lastRowScanOptSetColDataType, &cxt);
}
}
} else {
pNode = nodesListGetNode(pFunc->pParameterList, 0);

View File

@ -93,7 +93,7 @@ sql create stream streams21 trigger at_once ignore expired 0 ignore update 0 in
sql_error create stream streams22 trigger at_once ignore expired 0 ignore update 0 into streamt22 as select ts,1, b from rct1;
sql_error create stream streams23 trigger at_once ignore expired 0 ignore update 0 into streamt23 as select ts, a, b from rct1;
sql create stream streams23 trigger at_once ignore expired 0 ignore update 0 into streamt23 as select ts, a, b from rct1;
sql create stream streams24 trigger at_once ignore expired 0 ignore update 0 into streamt24(ts, a primary key, b) as select ts, a, b from rct1;
sql create stream streams25 trigger at_once ignore expired 0 ignore update 0 into rst6 as select ts, a, b from rct1;