Merge branch 'feat/TS-4243-3.0' of github.com:taosdata/TDengine into TEST/3.0/TS-4243

This commit is contained in:
Chris Zhai 2024-04-08 08:55:48 +08:00
commit 2b3eb443a0
14 changed files with 447 additions and 185 deletions

View File

@ -806,6 +806,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SML_INVALID_DB_CONF TAOS_DEF_ERROR_CODE(0, 0x3003) #define TSDB_CODE_SML_INVALID_DB_CONF TAOS_DEF_ERROR_CODE(0, 0x3003)
#define TSDB_CODE_SML_NOT_SAME_TYPE TAOS_DEF_ERROR_CODE(0, 0x3004) #define TSDB_CODE_SML_NOT_SAME_TYPE TAOS_DEF_ERROR_CODE(0, 0x3004)
#define TSDB_CODE_SML_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x3005) #define TSDB_CODE_SML_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x3005)
#define TSDB_CODE_SML_NOT_SUPPORT_PK TAOS_DEF_ERROR_CODE(0, 0x3006)
//tsma //tsma
#define TSDB_CODE_TSMA_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x3100) #define TSDB_CODE_TSMA_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x3100)

View File

@ -310,6 +310,16 @@ int32_t smlJoinMeasureTag(SSmlLineInfo *elements){
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool smlIsPKTable(STableMeta *pTableMeta){
for(int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++){
if(pTableMeta->schema[i].flags & COL_IS_KEY){
return true;
}
}
return false;
}
int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements) { int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements) {
bool isSameMeasure = IS_SAME_SUPER_TABLE; bool isSameMeasure = IS_SAME_SUPER_TABLE;
if(isSameMeasure) { if(isSameMeasure) {
@ -328,6 +338,11 @@ int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements) {
info->currSTableMeta = sMeta->tableMeta; info->currSTableMeta = sMeta->tableMeta;
info->maxTagKVs = sMeta->tags; info->maxTagKVs = sMeta->tags;
info->maxColKVs = sMeta->cols; info->maxColKVs = sMeta->cols;
if(smlIsPKTable(sMeta->tableMeta)){
terrno = TSDB_CODE_SML_NOT_SUPPORT_PK;
return -1;
}
return 0; return 0;
} }
@ -1063,6 +1078,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto end; goto end;
} }
} else if (code == TSDB_CODE_SUCCESS) { } else if (code == TSDB_CODE_SUCCESS) {
if(smlIsPKTable(pTableMeta)){
code = TSDB_CODE_SML_NOT_SUPPORT_PK;
goto end;
}
hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
HASH_NO_LOCK); HASH_NO_LOCK);
for (uint16_t i = pTableMeta->tableInfo.numOfColumns; for (uint16_t i = pTableMeta->tableInfo.numOfColumns;

View File

@ -895,11 +895,17 @@ typedef enum {
} EExecMode; } EExecMode;
typedef struct { typedef struct {
TSKEY ts; SRowKey rowKey;
int8_t dirty; int8_t dirty;
SColVal colVal; SColVal colVal;
} SLastCol; } SLastCol;
typedef struct {
TSKEY ts;
int8_t dirty;
SColVal colVal;
} SLastColV1;
int32_t tsdbOpenCache(STsdb *pTsdb); int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);

View File

@ -234,9 +234,9 @@ int32_t tsdbCacheNewTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapp
int32_t tsdbCacheDropTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow); int32_t tsdbCacheDropTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow);
int32_t tsdbCacheDropSubTables(STsdb* pTsdb, SArray* uids, tb_uid_t suid); int32_t tsdbCacheDropSubTables(STsdb* pTsdb, SArray* uids, tb_uid_t suid);
int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type); int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type);
int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type); int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, bool hasPrimayKey);
int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type);
int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type); int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey);
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync); int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);

View File

@ -449,18 +449,20 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
tsdbCacheNewSTableColumn(pTsdb, uids, cid, col_type); tsdbCacheNewSTableColumn(pTsdb, uids, cid, col_type);
} else if (deltaCol == -1) { } else if (deltaCol == -1) {
int16_t cid = -1; int16_t cid = -1;
int8_t col_type = -1; bool hasPrimaryKey = false;
if (onCols >= 2) {
hasPrimaryKey = (oStbEntry.stbEntry.schemaRow.pSchema[1].flags & COL_IS_KEY) ? true : false;
}
for (int i = 0, j = 0; i < nCols && j < onCols; ++i, ++j) { for (int i = 0, j = 0; i < nCols && j < onCols; ++i, ++j) {
if (pReq->schemaRow.pSchema[i].colId != oStbEntry.stbEntry.schemaRow.pSchema[j].colId) { if (pReq->schemaRow.pSchema[i].colId != oStbEntry.stbEntry.schemaRow.pSchema[j].colId) {
cid = oStbEntry.stbEntry.schemaRow.pSchema[j].colId; cid = oStbEntry.stbEntry.schemaRow.pSchema[j].colId;
col_type = oStbEntry.stbEntry.schemaRow.pSchema[j].type;
break; break;
} }
} }
if (cid != -1) { if (cid != -1) {
metaGetSubtables(pMeta, pReq->suid, uids); metaGetSubtables(pMeta, pReq->suid, uids);
tsdbCacheDropSTableColumn(pTsdb, uids, cid, col_type); tsdbCacheDropSTableColumn(pTsdb, uids, cid, hasPrimaryKey);
} }
} }
if (uids) taosArrayDestroy(uids); if (uids) taosArrayDestroy(uids);
@ -1478,6 +1480,11 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
terrno = TSDB_CODE_VND_COL_SUBSCRIBED; terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
goto _err; goto _err;
} }
bool hasPrimayKey = false;
if (pSchema->nCols >= 2) {
hasPrimayKey = pSchema->pSchema[1].flags & COL_IS_KEY ? true : false;
}
pSchema->version++; pSchema->version++;
tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema); tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema);
if (tlen) { if (tlen) {
@ -1489,9 +1496,8 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
int16_t cid = pColumn->colId; int16_t cid = pColumn->colId;
int8_t col_type = pColumn->type;
(void)tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type); (void)tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, hasPrimayKey);
} }
break; break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:

View File

@ -127,12 +127,25 @@ static void tsdbClosePgCache(STsdb *pTsdb) {
#define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t)) #define ROCKS_KEY_LEN (sizeof(tb_uid_t) + sizeof(int16_t) + sizeof(int8_t))
enum {
LFLAG_LAST_ROW = 0,
LFLAG_LAST = 1,
LFLAG_PRIMARY_KEY = (1 << 4),
};
typedef struct { typedef struct {
tb_uid_t uid; tb_uid_t uid;
int16_t cid; int16_t cid;
int8_t ltype; int8_t lflag;
} SLastKey; } SLastKey;
#define LAST_COL_VERSION_BASE (((int64_t)(0x1)) << 63)
#define LAST_COL_VERSION (LAST_COL_VERSION_BASE + 2)
#define HAS_PRIMARY_KEY(k) (((k).lflag & LFLAG_PRIMARY_KEY) == LFLAG_PRIMARY_KEY)
#define IS_LAST_ROW_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST_ROW)
#define IS_LAST_KEY(k) (((k).lflag & LFLAG_LAST) == LFLAG_LAST)
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
SVnode *pVnode = pTsdb->pVnode; SVnode *pVnode = pTsdb->pVnode;
vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN); vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
@ -167,9 +180,9 @@ static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t
return 1; return 1;
} }
if (lhs->ltype < rhs->ltype) { if (lhs->lflag < rhs->lflag) {
return -1; return -1;
} else if (lhs->ltype > rhs->ltype) { } else if (lhs->lflag > rhs->lflag) {
return 1; return 1;
} }
@ -322,16 +335,62 @@ static void rocksMayWrite(STsdb *pTsdb, bool force, bool read, bool lock) {
} }
} }
static SLastCol *tsdbCacheDeserialize(char const *value) { // note: new object do not own colVal's resource, just copy the pointer
static SLastCol *tsdbCacheConvertLastColV1(SLastColV1 *pLastColV1) {
SLastCol *pLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
if (pLastCol == NULL) return NULL;
pLastCol->rowKey.ts = pLastColV1->ts;
pLastCol->rowKey.numOfPKs = 0;
pLastCol->dirty = pLastColV1->dirty;
pLastCol->colVal = pLastColV1->colVal;
return pLastCol;
}
static SLastCol *tsdbCacheDeserializeV1(char const *value) {
if (!value) { if (!value) {
return NULL; return NULL;
} }
SLastCol *pLastCol = (SLastCol *)value; SLastColV1 *pLastColV1 = (SLastColV1 *)value;
SColVal *pColVal = &pLastCol->colVal; SColVal *pColVal = &pLastColV1->colVal;
if (IS_VAR_DATA_TYPE(pColVal->value.type)) { if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
if (pColVal->value.nData > 0) { if (pColVal->value.nData > 0) {
pColVal->value.pData = (char *)value + sizeof(*pLastCol); pColVal->value.pData = (char *)value + sizeof(*pLastColV1);
} else {
pColVal->value.pData = NULL;
}
}
return tsdbCacheConvertLastColV1(pLastColV1);
}
static SLastCol *tsdbCacheDeserializeV2(char const *value) {
if (!value) {
return NULL;
}
SLastCol *pLastCol = taosMemoryMalloc(sizeof(SLastCol));
*pLastCol = *(SLastCol *)(value);
char* currentPos = (char *)value + sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue* pValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pValue->type)) {
if (pValue->nData > 0) {
pValue->pData = currentPos;
currentPos += pValue->nData;
} else {
pValue->pData = NULL;
}
}
}
SColVal *pColVal = &pLastCol->colVal;
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
if (pColVal->value.nData > 0) {
pColVal->value.pData = currentPos;
currentPos += pColVal->value.nData;
} else { } else {
pColVal->value.pData = NULL; pColVal->value.pData = NULL;
} }
@ -340,25 +399,68 @@ static SLastCol *tsdbCacheDeserialize(char const *value) {
return pLastCol; return pLastCol;
} }
static SLastCol *tsdbCacheDeserialize(char const *value) {
if (!value) {
return NULL;
}
bool hasVersion = ((*(int64_t *)value) & LAST_COL_VERSION_BASE) == LAST_COL_VERSION_BASE;
if (!hasVersion) {
return tsdbCacheDeserializeV1(value);
}
return tsdbCacheDeserializeV2(value + sizeof(int64_t));
}
static uint32_t tsdbCacheCopyVarData(SValue *from, SValue *to) {
ASSERT(from->nData >= 0);
if (from->nData > 0) {
memcpy(to->pData, from->pData, from->nData);
}
to->type = from->type;
to->nData = from->nData;
return from->nData;
}
static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { static void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) {
SColVal *pColVal = &pLastCol->colVal; SColVal *pColVal = &pLastCol->colVal;
size_t length = sizeof(*pLastCol); size_t length = sizeof(int64_t) + sizeof(*pLastCol);
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
length += pLastCol->rowKey.pks[i].nData;
}
}
if (IS_VAR_DATA_TYPE(pColVal->value.type)) { if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
length += pColVal->value.nData; length += pColVal->value.nData;
} }
*value = taosMemoryMalloc(length);
*(SLastCol *)(*value) = *pLastCol; // set version
if (IS_VAR_DATA_TYPE(pColVal->value.type)) { *value = taosMemoryMalloc(length);
uint8_t *pVal = pColVal->value.pData; char *currentPos = *value;
SColVal *pDColVal = &((SLastCol *)(*value))->colVal; *(int64_t *)currentPos = LAST_COL_VERSION;
pDColVal->value.pData = *value + sizeof(*pLastCol); currentPos += sizeof(int64_t);
if (pColVal->value.nData > 0) {
memcpy(pDColVal->value.pData, pVal, pColVal->value.nData); // copy last col
} else { SLastCol* pToLastCol = (SLastCol *)currentPos;
pDColVal->value.pData = NULL; *pToLastCol = *pLastCol;
currentPos += sizeof(*pLastCol);
// copy var data pks
for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
SValue *pFromValue = &pLastCol->rowKey.pks[i];
if (IS_VAR_DATA_TYPE(pFromValue->type)) {
SValue *pToValue = &pToLastCol->rowKey.pks[i];
pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos;
currentPos += tsdbCacheCopyVarData(pFromValue, pToValue);
} }
} }
// copy var data value
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
SValue *pFromValue = &pColVal->value;
SValue *pToValue = &pToLastCol->colVal.value;
pToValue->pData = (pFromValue->nData == 0) ? NULL : currentPos;
currentPos += tsdbCacheCopyVarData(pFromValue, pToValue);
}
*size = length; *size = length;
} }
@ -459,13 +561,16 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud
taosMemoryFree(value); taosMemoryFree(value);
} }
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) { static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t lflag) {
int32_t code = 0; int32_t code = 0;
SLRUCache *pCache = pTsdb->lruCache; SLRUCache *pCache = pTsdb->lruCache;
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1}; SRowKey noneRowKey = {0};
SLastCol *pLastCol = &noneCol; noneRowKey.ts = TSKEY_MIN;
noneRowKey.numOfPKs = 0;
SLastCol noneCol = {.rowKey = noneRowKey, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1};
SLastCol *pLastCol = &noneCol;
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
*pTmpLastCol = *pLastCol; *pTmpLastCol = *pLastCol;
@ -477,7 +582,7 @@ static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, i
charge += pLastCol->colVal.value.nData; charge += pLastCol->colVal.value.nData;
} }
SLastKey *pLastKey = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; SLastKey *pLastKey = &(SLastKey){.lflag = lflag, .uid = uid, .cid = cid};
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL, LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) { if (status != TAOS_LRU_STATUS_OK) {
@ -519,7 +624,7 @@ int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
return code; return code;
} }
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) { static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimaryKey) {
int32_t code = 0; int32_t code = 0;
// build keys & multi get from rocks // build keys & multi get from rocks
@ -527,9 +632,11 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t)); size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
const size_t klen = ROCKS_KEY_LEN; const size_t klen = ROCKS_KEY_LEN;
int8_t lflag = hasPrimaryKey ? LFLAG_PRIMARY_KEY : 0;
char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid}; ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid};
((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid}; ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid};
keys_list[0] = keys; keys_list[0] = keys;
keys_list[1] = keys + sizeof(SLastKey); keys_list[1] = keys + sizeof(SLastKey);
@ -588,6 +695,8 @@ static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid,
if (erase) { if (erase) {
taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen); taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen);
} }
taosMemoryFree(pLastCol);
} }
taosMemoryFree(keys_list[0]); taosMemoryFree(keys_list[0]);
@ -606,13 +715,18 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap
taosThreadMutexLock(&pTsdb->lruMutex); taosThreadMutexLock(&pTsdb->lruMutex);
if (suid < 0) { if (suid < 0) {
int nCols = pSchemaRow->nCols; int8_t lflag = 0;
int nCols = pSchemaRow->nCols;
if (nCols >= 2) {
lflag = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0;
}
for (int i = 0; i < nCols; ++i) { for (int i = 0; i < nCols; ++i) {
int16_t cid = pSchemaRow->pSchema[i].colId; int16_t cid = pSchemaRow->pSchema[i].colId;
int8_t col_type = pSchemaRow->pSchema[i].type; int8_t col_type = pSchemaRow->pSchema[i].type;
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST_ROW);
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST);
} }
} else { } else {
STSchema *pTSchema = NULL; STSchema *pTSchema = NULL;
@ -622,13 +736,18 @@ int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrap
return -1; return -1;
} }
int nCols = pTSchema->numOfCols; int8_t lflag = 0;
int nCols = pTSchema->numOfCols;
if (nCols >= 2) {
lflag = (pTSchema->columns[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0;
}
for (int i = 0; i < nCols; ++i) { for (int i = 0; i < nCols; ++i) {
int16_t cid = pTSchema->columns[i].colId; int16_t cid = pTSchema->columns[i].colId;
int8_t col_type = pTSchema->columns[i].type; int8_t col_type = pTSchema->columns[i].type;
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0); (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST_ROW);
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1); (void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, lflag | LFLAG_LAST);
} }
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
@ -646,14 +765,17 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra
(void)tsdbCacheCommitNoLock(pTsdb); (void)tsdbCacheCommitNoLock(pTsdb);
if (suid < 0) { if (pSchemaRow != NULL) {
int nCols = pSchemaRow->nCols; bool hasPrimayKey = false;
int nCols = pSchemaRow->nCols;
if (nCols >= 2) {
hasPrimayKey = (pSchemaRow->pSchema[1].flags & COL_IS_KEY) ? true : false;
}
for (int i = 0; i < nCols; ++i) { for (int i = 0; i < nCols; ++i) {
int16_t cid = pSchemaRow->pSchema[i].colId; int16_t cid = pSchemaRow->pSchema[i].colId;
int8_t col_type = pSchemaRow->pSchema[i].type; int8_t col_type = pSchemaRow->pSchema[i].type;
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
} }
} else { } else {
STSchema *pTSchema = NULL; STSchema *pTSchema = NULL;
@ -663,13 +785,16 @@ int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWra
return -1; return -1;
} }
int nCols = pTSchema->numOfCols; bool hasPrimayKey = false;
int nCols = pTSchema->numOfCols;
if (nCols >= 2) {
hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
}
for (int i = 0; i < nCols; ++i) { for (int i = 0; i < nCols; ++i) {
int16_t cid = pTSchema->columns[i].colId; int16_t cid = pTSchema->columns[i].colId;
int8_t col_type = pTSchema->columns[i].type; int8_t col_type = pTSchema->columns[i].type;
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
} }
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
@ -698,13 +823,17 @@ int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
for (int i = 0; i < TARRAY_SIZE(uids); ++i) { for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i]; int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
int nCols = pTSchema->numOfCols; bool hasPrimayKey = false;
int nCols = pTSchema->numOfCols;
if (nCols >= 2) {
hasPrimayKey = (pTSchema->columns[1].flags & COL_IS_KEY) ? true : false;
}
for (int i = 0; i < nCols; ++i) { for (int i = 0; i < nCols; ++i) {
int16_t cid = pTSchema->columns[i].colId; int16_t cid = pTSchema->columns[i].colId;
int8_t col_type = pTSchema->columns[i].type; int8_t col_type = pTSchema->columns[i].type;
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
} }
} }
@ -732,15 +861,14 @@ int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t
return code; return code;
} }
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) { int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, bool hasPrimayKey) {
int32_t code = 0; int32_t code = 0;
taosThreadMutexLock(&pTsdb->lruMutex); taosThreadMutexLock(&pTsdb->lruMutex);
(void)tsdbCacheCommitNoLock(pTsdb); (void)tsdbCacheCommitNoLock(pTsdb);
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
rocksMayWrite(pTsdb, true, false, true); rocksMayWrite(pTsdb, true, false, true);
@ -768,7 +896,7 @@ int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t
return code; return code;
} }
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) { int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, bool hasPrimayKey) {
int32_t code = 0; int32_t code = 0;
taosThreadMutexLock(&pTsdb->lruMutex); taosThreadMutexLock(&pTsdb->lruMutex);
@ -778,8 +906,7 @@ int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_
for (int i = 0; i < TARRAY_SIZE(uids); ++i) { for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i]; int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0); (void)tsdbCacheDropTableColumn(pTsdb, uid, cid, hasPrimayKey);
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
} }
rocksMayWrite(pTsdb, true, false, true); rocksMayWrite(pTsdb, true, false, true);
@ -794,6 +921,58 @@ typedef struct {
SLastKey key; SLastKey key;
} SIdxKey; } SIdxKey;
static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal *pColVal) {
uint8_t *pVal = NULL;
int nData = 0;
// update rowkey
pLastCol->rowKey.ts = pRowKey->ts;
pLastCol->rowKey.numOfPKs = pRowKey->numOfPKs;
for (int8_t i = 0; i < pRowKey->numOfPKs; i++) {
SValue *pPKValue = &pLastCol->rowKey.pks[i];
SValue *pNewPKValue = &pRowKey->pks[i];
if (IS_VAR_DATA_TYPE(pPKValue->type)) {
pVal = pPKValue->pData;
nData = pPKValue->nData;
}
*pPKValue = *pNewPKValue;
if (IS_VAR_DATA_TYPE(pPKValue->type)) {
if (nData < pPKValue->nData) {
taosMemoryFree(pVal);
pPKValue->pData = taosMemoryCalloc(1, pNewPKValue->nData);
} else {
pPKValue->pData = pVal;
}
if (pNewPKValue->nData) {
memcpy(pPKValue->pData, pNewPKValue->pData, pNewPKValue->nData);
}
}
}
// update colval
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
nData = pLastCol->colVal.value.nData;
pVal = pLastCol->colVal.value.pData;
}
pLastCol->colVal = *pColVal;
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
if (nData < pColVal->value.nData) {
taosMemoryFree(pVal);
pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData);
} else {
pLastCol->colVal.value.pData = pVal;
}
if (pColVal->value.nData) {
memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
}
if (!pLastCol->dirty) {
pLastCol->dirty = 1;
}
}
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) {
int32_t code = 0; int32_t code = 0;
@ -821,46 +1000,28 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
// 3, build keys & multi get from rocks // 3, build keys & multi get from rocks
int num_keys = TARRAY_SIZE(aColVal); int num_keys = TARRAY_SIZE(aColVal);
TSKEY keyTs = TSDBROW_TS(pRow);
SArray *remainCols = NULL; SArray *remainCols = NULL;
SLRUCache *pCache = pTsdb->lruCache; SLRUCache *pCache = pTsdb->lruCache;
STsdbRowKey tsdbRowKey = {0};
tsdbRowGetKey(pRow, &tsdbRowKey);
SRowKey *pRowKey = &tsdbRowKey.key;
int8_t lflag = (pRowKey->numOfPKs != 0) ? LFLAG_PRIMARY_KEY : 0;
taosThreadMutexLock(&pTsdb->lruMutex); taosThreadMutexLock(&pTsdb->lruMutex);
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i);
int16_t cid = pColVal->cid; int16_t cid = pColVal->cid;
SLastKey *key = &(SLastKey){.ltype = 0, .uid = uid, .cid = cid}; SLastKey *key = &(SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid};
size_t klen = ROCKS_KEY_LEN; size_t klen = ROCKS_KEY_LEN;
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
if (h) { if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
if (pLastCol->ts <= keyTs) { if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) {
uint8_t *pVal = NULL; tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
int nData = pLastCol->colVal.value.nData;
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
pVal = pLastCol->colVal.value.pData;
}
pLastCol->ts = keyTs;
pLastCol->colVal = *pColVal;
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
if (nData < pColVal->value.nData) {
taosMemoryFree(pVal);
pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData);
} else {
pLastCol->colVal.value.pData = pVal;
}
if (pColVal->value.nData) {
memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
}
if (!pLastCol->dirty) {
pLastCol->dirty = 1;
}
} }
taosLRUCacheRelease(pCache, h, false); taosLRUCacheRelease(pCache, h, false);
} else { } else {
if (!remainCols) { if (!remainCols) {
@ -870,36 +1031,14 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
} }
if (COL_VAL_IS_VALUE(pColVal)) { if (COL_VAL_IS_VALUE(pColVal)) {
key->ltype = 1; key->lflag = lflag | LFLAG_LAST;
LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); LRUHandle *h = taosLRUCacheLookup(pCache, key, klen);
if (h) { if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
if (pLastCol->ts <= keyTs) { if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) {
uint8_t *pVal = NULL; tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal);
int nData = pLastCol->colVal.value.nData;
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
pVal = pLastCol->colVal.value.pData;
}
pLastCol->ts = keyTs;
pLastCol->colVal = *pColVal;
if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
if (nData < pColVal->value.nData) {
taosMemoryFree(pVal);
pLastCol->colVal.value.pData = taosMemoryCalloc(1, pColVal->value.nData);
} else {
pLastCol->colVal.value.pData = pVal;
}
if (pColVal->value.nData) {
memcpy(pLastCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
}
if (!pLastCol->dirty) {
pLastCol->dirty = 1;
}
} }
taosLRUCacheRelease(pCache, h, false); taosLRUCacheRelease(pCache, h, false);
} else { } else {
if (!remainCols) { if (!remainCols) {
@ -943,11 +1082,11 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
if (idxKey->key.ltype == 0) { if (IS_LAST_ROW_KEY(idxKey->key)) {
if (NULL == pLastCol || pLastCol->ts <= keyTs) { if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) {
char *value = NULL; char *value = NULL;
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen);
// SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid}; // SLastKey key = (SLastKey){.ltype = 0, .uid = uid, .cid = pColVal->cid};
taosThreadMutexLock(&pTsdb->rCache.rMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex);
@ -976,10 +1115,10 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
} }
} else { } else {
if (COL_VAL_IS_VALUE(pColVal)) { if (COL_VAL_IS_VALUE(pColVal)) {
if (NULL == pLastCol || pLastCol->ts <= keyTs) { if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) {
char *value = NULL; char *value = NULL;
size_t vlen = 0; size_t vlen = 0;
tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); tsdbCacheSerialize(&(SLastCol){.rowKey = *pRowKey, .colVal = *pColVal}, &value, &vlen);
// SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid}; // SLastKey key = (SLastKey){.ltype = 1, .uid = uid, .cid = pColVal->cid};
taosThreadMutexLock(&pTsdb->rCache.rMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex);
@ -1007,6 +1146,8 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
taosMemoryFree(value); taosMemoryFree(value);
} }
} }
taosMemoryFree(pLastCol);
} }
rocksdb_free(values_list[i]); rocksdb_free(values_list[i]);
@ -1221,7 +1362,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
SIdxKey *idxKey = taosArrayGet(remainCols, 0); SIdxKey *idxKey = taosArrayGet(remainCols, 0);
if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) { if (idxKey->key.cid != PRIMARYKEY_TIMESTAMP_COL_ID) {
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID}; SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = PRIMARYKEY_TIMESTAMP_COL_ID};
taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key}); taosArrayInsert(remainCols, 0, &(SIdxKey){0, *key});
} }
@ -1244,7 +1385,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SIdxKey *idxKey = taosArrayGet(remainCols, i); SIdxKey *idxKey = taosArrayGet(remainCols, i);
slotIds[i] = pr->pSlotIds[idxKey->idx]; slotIds[i] = pr->pSlotIds[idxKey->idx];
if (idxKey->key.ltype == CACHESCAN_RETRIEVE_LAST >> 3) { if (idxKey->key.lflag == CACHESCAN_RETRIEVE_LAST >> 3) {
if (NULL == lastTmpIndexArray) { if (NULL == lastTmpIndexArray) {
lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t)); lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
} }
@ -1290,7 +1431,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
} }
// still null, then make up a none col value // still null, then make up a none col value
SLastCol noneCol = {.ts = TSKEY_MIN, SLastCol noneCol = {.rowKey.ts = TSKEY_MIN,
.colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)}; .colVal = COL_VAL_NONE(idxKey->key.cid, pr->pSchema->columns[slotIds[i]].type)};
if (!pLastCol) { if (!pLastCol) {
pLastCol = &noneCol; pLastCol = &noneCol;
@ -1409,6 +1550,7 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
taosArraySet(pLastArray, idxKey->idx, &lastCol); taosArraySet(pLastArray, idxKey->idx, &lastCol);
taosArrayRemove(remainCols, j); taosArrayRemove(remainCols, j);
taosMemoryFree(pLastCol);
taosMemoryFree(values_list[i]); taosMemoryFree(values_list[i]);
} else { } else {
++j; ++j;
@ -1436,7 +1578,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i]; int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; SLastKey *key = &(SLastKey){.lflag = ltype, .uid = uid, .cid = cid};
// for select last_row, last case // for select last_row, last case
int32_t funcType = FUNCTION_TYPE_CACHE_LAST; int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) { if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
@ -1444,7 +1586,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
} }
if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) { if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST); int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
key->ltype = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3; key->lflag = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
} }
LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
@ -1457,7 +1599,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
taosLRUCacheRelease(pCache, h, false); taosLRUCacheRelease(pCache, h, false);
} else { } else {
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; SLastCol noneCol = {.rowKey.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
taosArrayPush(pLastArray, &noneCol); taosArrayPush(pLastArray, &noneCol);
@ -1517,12 +1659,18 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
const size_t klen = ROCKS_KEY_LEN; const size_t klen = ROCKS_KEY_LEN;
int8_t lflag = 0;
if (num_keys >= 2) {
lflag = (pTSchema->columns[1].flags & COL_IS_KEY) ? LFLAG_PRIMARY_KEY : 0;
}
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
int16_t cid = pTSchema->columns[i].colId; int16_t cid = pTSchema->columns[i].colId;
char *keys = taosMemoryCalloc(2, sizeof(SLastKey)); char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid}; ((SLastKey *)keys)[0] = (SLastKey){.lflag = lflag | LFLAG_LAST, .uid = uid, .cid = cid};
((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid}; ((SLastKey *)keys)[1] = (SLastKey){.lflag = lflag | LFLAG_LAST_ROW, .uid = uid, .cid = cid};
keys_list[i] = keys; keys_list[i] = keys;
keys_list[num_keys + i] = keys + sizeof(SLastKey); keys_list[num_keys + i] = keys + sizeof(SLastKey);
@ -1554,15 +1702,16 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
taosThreadMutexLock(&pTsdb->rCache.rMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex);
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[i], klen); rocksdb_writebatch_delete(wb, keys_list[i], klen);
} }
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { if (NULL != pLastCol && (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey)) {
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen); rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
} }
taosThreadMutexUnlock(&pTsdb->rCache.rMutex); taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
taosMemoryFree(pLastCol);
rocksdb_free(values_list[i]); rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]); rocksdb_free(values_list[i + num_keys]);
@ -1575,7 +1724,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
if (pLastCol->dirty) { if (pLastCol->dirty) {
pLastCol->dirty = 0; pLastCol->dirty = 0;
} }
if (pLastCol->ts <= eKey && pLastCol->ts >= sKey) { if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
erase = true; erase = true;
} }
taosLRUCacheRelease(pTsdb->lruCache, h, erase); taosLRUCacheRelease(pTsdb->lruCache, h, erase);
@ -1591,7 +1740,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
if (pLastCol->dirty) { if (pLastCol->dirty) {
pLastCol->dirty = 0; pLastCol->dirty = 0;
} }
if (pLastCol->ts <= eKey && pLastCol->ts >= sKey) { if (pLastCol->rowKey.ts <= eKey && pLastCol->rowKey.ts >= sKey) {
erase = true; erase = true;
} }
taosLRUCacheRelease(pTsdb->lruCache, h, erase); taosLRUCacheRelease(pTsdb->lruCache, h, erase);
@ -3083,7 +3232,7 @@ static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray,
for (int32_t i = 0; i < nCols; ++i) { for (int32_t i = 0; i < nCols; ++i) {
int16_t slotId = slotIds[i]; int16_t slotId = slotIds[i];
SLastCol col = {.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)}; SLastCol col = {.rowKey.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)};
taosArrayPush(pColArray, &col); taosArrayPush(pColArray, &col);
} }
*ppColArray = pColArray; *ppColArray = pColArray;
@ -3188,12 +3337,12 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
STColumn *pTColumn = &pTSchema->columns[0]; STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal}); taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal});
continue; continue;
} }
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; *pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
if (pColVal->value.nData > 0) { if (pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
@ -3243,7 +3392,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) { if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal}; SLastCol lastCol = {.rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) { if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
taosMemoryFree(pLastCol->colVal.value.pData); taosMemoryFree(pLastCol->colVal.value.pData);
@ -3367,12 +3516,12 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
STColumn *pTColumn = &pTSchema->columns[0]; STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs})); *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal}); taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal});
continue; continue;
} }
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; *pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) { if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
if (pColVal->value.nData > 0) { if (pColVal->value.nData > 0) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);

View File

@ -92,7 +92,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
p = (SFirstLastRes*)varDataVal(pRes[i]); p = (SFirstLastRes*)varDataVal(pRes[i]);
p->ts = pColVal->ts; p->ts = pColVal->rowKey.ts;
ts = p->ts; ts = p->ts;
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
// allNullRow = p->isNull & allNullRow; // allNullRow = p->isNull & allNullRow;
@ -399,12 +399,12 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
for (int32_t i = 0; i < pr->numOfCols; ++i) { for (int32_t i = 0; i < pr->numOfCols; ++i) {
int32_t slotId = slotIds[i]; int32_t slotId = slotIds[i];
if (slotId == -1) { if (slotId == -1) {
SLastCol p = {.ts = INT64_MIN, .colVal.value.type = TSDB_DATA_TYPE_BOOL, .colVal.flag = CV_FLAG_NULL}; SLastCol p = {.rowKey.ts = INT64_MIN, .colVal.value.type = TSDB_DATA_TYPE_BOOL, .colVal.flag = CV_FLAG_NULL};
taosArrayPush(pLastCols, &p); taosArrayPush(pLastCols, &p);
continue; continue;
} }
struct STColumn* pCol = &pr->pSchema->columns[slotId]; struct STColumn* pCol = &pr->pSchema->columns[slotId];
SLastCol p = {.ts = INT64_MIN, .colVal.value.type = pCol->type, .colVal.flag = CV_FLAG_NULL}; SLastCol p = {.rowKey.ts = INT64_MIN, .colVal.value.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char)); p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
@ -431,7 +431,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
SLastCol* p = taosArrayGet(pLastCols, k); SLastCol* p = taosArrayGet(pLastCols, k);
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k); SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k);
if (pColVal->ts > p->ts) { if (pColVal->rowKey.ts > p->rowKey.ts) {
if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
if (!COL_VAL_IS_VALUE(&p->colVal)) { if (!COL_VAL_IS_VALUE(&p->colVal)) {
hasNotNullRow = false; hasNotNullRow = false;
@ -443,7 +443,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
hasRes = true; hasRes = true;
p->ts = pColVal->ts; p->rowKey.ts = pColVal->rowKey.ts;
if (k == 0) { if (k == 0) {
if (TARRAY_SIZE(pTableUidList) == 0) { if (TARRAY_SIZE(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &uid); taosArrayPush(pTableUidList, &uid);
@ -452,8 +452,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
} }
if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { if (pColVal->rowKey.ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
singleTableLastTs = pColVal->ts; singleTableLastTs = pColVal->rowKey.ts;
} }
if (!IS_VAR_DATA_TYPE(pColVal->colVal.value.type)) { if (!IS_VAR_DATA_TYPE(pColVal->colVal.value.type)) {

View File

@ -194,18 +194,8 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
pMemTable->nDel++; pMemTable->nDel++;
pMemTable->minVer = TMIN(pMemTable->minVer, version); pMemTable->minVer = TMIN(pMemTable->minVer, version);
pMemTable->maxVer = TMAX(pMemTable->maxVer, version); pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
/*
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
}
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
}
*/
// if (eKey >= pTbData->maxKey && sKey <= pTbData->maxKey) {
tsdbCacheDel(pTsdb, suid, uid, sKey, eKey); tsdbCacheDel(pTsdb, suid, uid, sKey, eKey);
//}
tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
" at version %" PRId64, " at version %" PRId64,
@ -838,4 +828,4 @@ TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
pIter->row = pIter->pNode->row; pIter->row = pIter->pNode->row;
return pIter->pRow; return pIter->pRow;
} }

View File

@ -114,10 +114,10 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
if (p1->pks[0].nData == p2->pks[0].nData) { if (p1->pks[0].nData == p2->pks[0].nData) {
return 0; return 0;
} else { } else {
return p1->pks[0].nData > p2->pks[0].nData?1:-1; return p1->pks[0].nData > p2->pks[0].nData ? 1 : -1;
} }
} else { } else {
return ret > 0? 1:-1; return ret > 0 ? 1 : -1;
} }
} else { } else {
return comparFn(&p1->pks[0].val, &p2->pks[0].val); return comparFn(&p1->pks[0].val, &p2->pks[0].val);

View File

@ -992,39 +992,39 @@ static bool overlapWithTimeWindow(STimeWindow* p1, STimeWindow* pQueryWindow, ST
} }
static int32_t sortUidComparFn(const void* p1, const void* p2) { static int32_t sortUidComparFn(const void* p1, const void* p2) {
const STimeWindow* px1 = p1; const SSttKeyRange* px1 = p1;
const STimeWindow* px2 = p2; const SSttKeyRange* px2 = p2;
if (px1->skey == px2->skey) {
return 0; int32_t ret = tRowKeyCompare(&px1, px2);
} else { return ret;
return px1->skey < px2->skey ? -1 : 1;
}
} }
bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo,
int32_t order) { int32_t order) {
// check if it overlap with del skyline // check if it overlap with del skyline
taosArraySort(pTimewindowList, sortUidComparFn); taosArraySort(pKeyRangeList, sortUidComparFn);
int32_t num = taosArrayGetSize(pTimewindowList); int32_t num = taosArrayGetSize(pKeyRangeList);
if (num == 0) { if (num == 0) {
return false; return false;
} }
STimeWindow* p = taosArrayGet(pTimewindowList, 0); SSttKeyRange* pRange = taosArrayGet(pKeyRangeList, 0);
if (overlapWithTimeWindow(p, pQueryWindow, pScanInfo, order)) { STimeWindow w = {.skey = pRange->skey.ts, .ekey = pRange->ekey.ts};
if (overlapWithTimeWindow(&w, pQueryWindow, pScanInfo, order)) {
return false; return false;
} }
for (int32_t i = 0; i < num - 1; ++i) { for (int32_t i = 0; i < num - 1; ++i) {
STimeWindow* p1 = taosArrayGet(pTimewindowList, i); SSttKeyRange* p1 = taosArrayGet(pKeyRangeList, i);
STimeWindow* p2 = taosArrayGet(pTimewindowList, i + 1); SSttKeyRange* p2 = taosArrayGet(pKeyRangeList, i + 1);
if (p1->ekey >= p2->skey) { if (p1->ekey.ts >= p2->skey.ts) {
return false; return false;
} }
bool overlap = overlapWithTimeWindow(p2, pQueryWindow, pScanInfo, order); STimeWindow w2 = {.skey = p2->skey.ts, .ekey = p2->ekey.ts};
bool overlap = overlapWithTimeWindow(&w2, pQueryWindow, pScanInfo, order);
if (overlap) { if (overlap) {
return false; return false;
} }

View File

@ -206,28 +206,14 @@ static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumn
cur.pks[0].type = pPkCol->info.type; cur.pks[0].type = pPkCol->info.type;
} }
// let's discard the duplicated ts
if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevKey.ts)) { if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevKey.ts)) {
// if (pPkCol == NULL) { return true;
return true;
/* } else {
tRowGetKeyFromColData(currentTs, pPkCol, curIndex, &cur);
if (tRowKeyCompare(&cur, &pSliceInfo->prevKey) == 0) {
return true;
}
}*/
} }
pSliceInfo->prevTsSet = true; pSliceInfo->prevTsSet = true;
tRowKeyAssign(&pSliceInfo->prevKey, &cur); tRowKeyAssign(&pSliceInfo->prevKey, &cur);
// todo handle next
if (currentTs == pSliceInfo->win.ekey && curIndex < rows - 1) {
int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1);
if (currentTs == nextTs) {
return true;
}
}
return false; return false;
} }
@ -735,7 +721,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
// check for duplicate timestamps // check for duplicate timestamps
if (checkDuplicateTimestamps(pSliceInfo, pTsCol, pPkCol, i, pBlock->info.rows)) { if (checkDuplicateTimestamps(pSliceInfo, pTsCol, pPkCol, i, pBlock->info.rows)) {
continue; continue;
// T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP);
} }
if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) { if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) {
@ -754,6 +739,7 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
if (checkWindowBoundReached(pSliceInfo)) { if (checkWindowBoundReached(pSliceInfo)) {
break; break;
} }
if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
saveBlockStatus(pSliceInfo, pBlock, i); saveBlockStatus(pSliceInfo, pBlock, i);
return; return;
@ -828,7 +814,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
// if reached here, meaning block processing finished naturally, // if reached here, meaning block processing finished naturally,
// or interpolation reach window upper bound // or interpolation reach window upper bound
pSliceInfo->pRemainRes = NULL; pSliceInfo->pRemainRes = NULL;
} }
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) { static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) {

View File

@ -520,9 +520,8 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB
// duplicated ts row does not involve in the interpolation of end value for current time window // duplicated ts row does not involve in the interpolation of end value for current time window
int32_t x = endRowIndex; int32_t x = endRowIndex;
while(x >= 0) { while(x > 0) {
if (tsCols[x] == tsCols[x-1]) { if (tsCols[x] == tsCols[x-1]) {
x -= 1; x -= 1;
} else { } else {
endRowIndex = x; endRowIndex = x;

View File

@ -668,6 +668,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DATA, "Invalid data format
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DB_CONF, "Invalid schemaless db config") TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_DB_CONF, "Invalid schemaless db config")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_NOT_SAME_TYPE, "Not the same type like before") TAOS_DEFINE_ERROR(TSDB_CODE_SML_NOT_SAME_TYPE, "Not the same type like before")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INTERNAL_ERROR, "Internal error") TAOS_DEFINE_ERROR(TSDB_CODE_SML_INTERNAL_ERROR, "Internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_SML_NOT_SUPPORT_PK, "Can not insert data into table with primary key")
//tsma //tsma
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INIT_FAILED, "Tsma init failed") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INIT_FAILED, "Tsma init failed")

View File

@ -1841,12 +1841,116 @@ int sml_td18789_Test() {
return code; return code;
} }
int sml_td29373_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "drop database if exists td29373");
taos_free_result(pRes);
pRes = taos_query(taos, "create database if not exists td29373");
taos_free_result(pRes);
pRes = taos_query(taos, "use td29373");
taos_free_result(pRes);
pRes = taos_query(taos, "create table pktable (ts timestamp, f1 int primary key, f2 binary(10)) tags (t1 int)");
taos_free_result(pRes);
// case 1
const char *sql[] = {
"pktable,t1=1 f1=283i32,f2=b\"hello\" 1632299372000",
"pktable,t1=2 f1=232i32,f2=b\"he3llo\" 1632299373000",
};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
int code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
ASSERT(code == TSDB_CODE_SML_NOT_SUPPORT_PK);
taos_free_result(pRes);
// case 2
const char *sql1[] = {
"pktable,t1=2 f2=b\"he3llo\",f1=232i32 1632299373000",
"pktable,t1=1 f1=283i32,f2=b\"hello\" 1632299372000"
};
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
ASSERT(code == TSDB_CODE_SML_NOT_SUPPORT_PK);
taos_free_result(pRes);
// case 3
pRes = taos_query(taos, "create table pktablejson (ts timestamp, f1 int primary key, f2 binary(10)) tags (`host` varchar(8), dc varchar(8))");
taos_free_result(pRes);
const char *sql2[] = { ""
"[\n"
" {\n"
" \"metric\": \"pktablejson\",\n"
" \"timestamp\": 1346846400001,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": \"web01\",\n"
" \"dc\": \"lga\"\n"
" }\n"
" },\n"
" {\n"
" \"metric\": \"pktablejson\",\n"
" \"timestamp\": 1346846400002,\n"
" \"value\": 9,\n"
" \"tags\": {\n"
" \"host\": \"web02\",\n"
" \"dc\": \"lga\"\n"
" }\n"
" }\n"
"]"
};
char *sql3[1] = {0};
for (int i = 0; i < 1; i++) {
sql3[i] = taosMemoryCalloc(1, 1024);
strncpy(sql3[i], sql2[i], 1023);
}
pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
ASSERT(code == TSDB_CODE_SML_NOT_SUPPORT_PK);
taos_free_result(pRes);
// case 4
const char *sql4[] = {
"pktablejson 1479496100 1.3E0 host=web01 dc=eth0",
"pktablejson 1479496100 1.2E0 dc=web01 host=eth0",
};
pRes = taos_schemaless_insert(taos, (char **)sql4, sizeof(sql4) / sizeof(sql4[0]), TSDB_SML_TELNET_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
ASSERT(code == TSDB_CODE_SML_NOT_SUPPORT_PK);
taos_free_result(pRes);
taos_close(taos);
return code;
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
if (argc == 2) { if (argc == 2) {
taos_options(TSDB_OPTION_CONFIGDIR, argv[1]); taos_options(TSDB_OPTION_CONFIGDIR, argv[1]);
} }
int ret = 0; int ret = 0;
ret = sml_td29373_Test();
ASSERT(ret);
ret = sml_td24559_Test(); ret = sml_td24559_Test();
ASSERT(!ret); ASSERT(!ret);
ret = sml_td18789_Test(); ret = sml_td18789_Test();