fix: ci cases

This commit is contained in:
Hongze Cheng 2024-12-12 16:38:55 +08:00
parent 01ceb2e90b
commit 9ce7d0e986
3 changed files with 305 additions and 48 deletions

View File

@ -109,7 +109,7 @@ int32_t metaFetchEntryByUid(SMeta *pMeta, int64_t uid, SMetaEntry **ppEntry) {
return code;
}
static int32_t metaFetchEntryByName(SMeta *pMeta, const char *name, SMetaEntry **ppEntry) {
int32_t metaFetchEntryByName(SMeta *pMeta, const char *name, SMetaEntry **ppEntry) {
int32_t code = TSDB_CODE_SUCCESS;
void *value = NULL;
int32_t valueSize = 0;
@ -1309,6 +1309,7 @@ static int32_t metaHandleNormalTableUpdate(SMeta *pMeta, const SMetaEntry *pEntr
tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, pEntry->uid, pEntry->ntbEntry.schemaRow.version);
}
metaTimeSeriesNotifyCheck(pMeta);
metaFetchEntryFree(&pOldEntry);
return code;
}

View File

@ -43,23 +43,21 @@ static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
static int32_t updataTableColCmpr(SColCmprWrapper *pWp, SSchema *pSchema, int8_t add, uint32_t compress) {
int32_t updataTableColCmpr(SColCmprWrapper *pWp, SSchema *pSchema, int8_t add, uint32_t compress) {
int32_t nCols = pWp->nCols;
int32_t ver = pWp->version;
if (add) {
SColCmpr *p = taosMemoryCalloc(1, sizeof(SColCmpr) * (nCols + 1));
SColCmpr *p = taosMemoryRealloc(pWp->pColCmpr, sizeof(SColCmpr) * (nCols + 1));
if (p == NULL) {
return terrno;
}
memcpy(p, pWp->pColCmpr, sizeof(SColCmpr) * nCols);
pWp->pColCmpr = p;
SColCmpr *pCol = p + nCols;
pCol->id = pSchema->colId;
pCol->alg = compress;
pWp->nCols = nCols + 1;
pWp->version = ver;
pWp->pColCmpr = p;
} else {
for (int32_t i = 0; i < nCols; i++) {
SColCmpr *pOCmpr = &pWp->pColCmpr[i];
@ -2989,8 +2987,9 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMeta
switch (pReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN:
return metaAddTableColumn(pMeta, version, pReq, pMetaRsp);
case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION:
case TSDB_ALTER_TABLE_DROP_COLUMN:
// return metaDropTableColumn(pMeta, version, pReq, pMetaRsp);
case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION:
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
return metaAlterTableColumn(pMeta, version, pReq, pMetaRsp);

View File

@ -18,7 +18,9 @@
extern int32_t metaHandleEntry2(SMeta *pMeta, const SMetaEntry *pEntry);
extern int32_t metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp);
extern int32_t metaFetchEntryByUid(SMeta *pMeta, int64_t uid, SMetaEntry **ppEntry);
extern int32_t metaFetchEntryByName(SMeta *pMeta, const char *name, SMetaEntry **ppEntry);
extern void metaFetchEntryFree(SMetaEntry **ppEntry);
extern int32_t updataTableColCmpr(SColCmprWrapper *pWp, SSchema *pSchema, int8_t add, uint32_t compress);
static int32_t metaCheckCreateSuperTableReq(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
int32_t vgId = TD_VID(pMeta->pVnode);
@ -549,10 +551,9 @@ int32_t metaDropTable2(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
TAOS_RETURN(code);
}
int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) {
int32_t code = TSDB_CODE_SUCCESS;
static int32_t metaCheckAlterTableColumnReq(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
int32_t code = 0;
// check request
if (NULL == pReq->colName || strlen(pReq->colName) == 0) {
metaError("vgId:%d, %s failed at %s:%d since invalid column name:%s, version:%" PRId64, TD_VID(pMeta->pVnode),
__func__, __FILE__, __LINE__, pReq->colName, version);
@ -595,19 +596,29 @@ int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, ST
__FILE__, __LINE__, tstrerror(code), version, pReq->tbName);
TAOS_RETURN(code);
}
TAOS_RETURN(code);
}
int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) {
int32_t code = TSDB_CODE_SUCCESS;
// check request
code = metaCheckAlterTableColumnReq(pMeta, version, pReq);
if (code) {
TAOS_RETURN(code);
}
// fetch old entry
SMetaEntry *pEntry = NULL;
code = metaFetchEntryByUid(pMeta, uid, &pEntry);
code = metaFetchEntryByName(pMeta, pReq->tbName, &pEntry);
if (code) {
metaError("vgId:%d, %s failed at %s:%d since table %s uid %" PRId64 " not found, version:%" PRId64,
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->tbName, uid, version);
metaError("vgId:%d, %s failed at %s:%d since table %s not found, version:%" PRId64, TD_VID(pMeta->pVnode), __func__,
__FILE__, __LINE__, pReq->tbName, version);
TAOS_RETURN(code);
}
if (pEntry->version >= version) {
metaError("vgId:%d, %s failed at %s:%d since table %s uid %" PRId64 " version %" PRId64
" is not less than %" PRId64,
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->tbName, uid, pEntry->version, version);
metaError("vgId:%d, %s failed at %s:%d since table %s version %" PRId64 " is not less than %" PRId64,
TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, pReq->tbName, pEntry->version, version);
metaFetchEntryFree(&pEntry);
TAOS_RETURN(TSDB_CODE_INVALID_PARA);
}
@ -651,55 +662,301 @@ int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, ST
pColumn->flags = pReq->flags;
pColumn->colId = pEntry->ntbEntry.ncid++;
tstrncpy(pColumn->name, pReq->colName, TSDB_COL_NAME_LEN);
uint32_t compress = createDefaultColCmprByType(pColumn->type);
code = updataTableColCmpr(&pEntry->colCmpr, pColumn, 1, compress);
if (code) {
metaError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64, TD_VID(pMeta->pVnode), __func__, __FILE__,
__LINE__, tstrerror(code), version);
metaFetchEntryFree(&pEntry);
TAOS_RETURN(code);
}
// do handle entry
code = metaHandleEntry2(pMeta, pEntry);
if (code) {
metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode),
__func__, __FILE__, __LINE__, tstrerror(code), uid, pReq->tbName, version);
__func__, __FILE__, __LINE__, tstrerror(code), pEntry->uid, pReq->tbName, version);
} else {
metaInfo("vgId:%d, table %s uid %" PRId64 " is updated, version:%" PRId64, TD_VID(pMeta->pVnode), pReq->tbName, uid,
version);
metaInfo("vgId:%d, table %s uid %" PRId64 " is updated, version:%" PRId64, TD_VID(pMeta->pVnode), pReq->tbName,
pEntry->uid, version);
}
metaFetchEntryFree(&pEntry);
if (metaUpdateMetaRsp(uid, pReq->tbName, pSchema, pRsp) < 0) {
if (metaUpdateMetaRsp(pEntry->uid, pReq->tbName, pSchema, pRsp) < 0) {
metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode),
__func__, __FILE__, __LINE__, tstrerror(code), uid, pReq->tbName, version);
__func__, __FILE__, __LINE__, tstrerror(code), pEntry->uid, pReq->tbName, version);
} else {
for (int32_t i = 0; i < pEntry->colCmpr.nCols; i++) {
SColCmpr *p = &pEntry->colCmpr.pColCmpr[i];
pRsp->pSchemaExt[i].colId = p->id;
pRsp->pSchemaExt[i].compress = p->alg;
}
}
metaFetchEntryFree(&pEntry);
TAOS_RETURN(code);
}
int32_t metaDropTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) {
int32_t code = TSDB_CODE_SUCCESS;
code = metaCheckAlterTableColumnReq(pMeta, version, pReq);
if (code) {
TAOS_RETURN(code);
}
// TODO
TAOS_RETURN(code);
#if 0
void *pVal = NULL;
int nVal = 0;
const void *pData = NULL;
int nData = 0;
int ret = 0;
tb_uid_t uid;
int64_t oversion;
SSchema *pColumn = NULL;
SMetaEntry entry = {0};
SSchemaWrapper *pSchema;
int c;
bool freeColCmpr = false;
// search the column to add/drop/update
pSchema = &entry.ntbEntry.schemaRow;
// save old entry
SMetaEntry oldEntry = {.type = TSDB_NORMAL_TABLE, .uid = entry.uid};
oldEntry.ntbEntry.schemaRow.nCols = pSchema->nCols;
int32_t rowLen = -1;
if (pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN ||
pAlterTbReq->action == TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES) {
rowLen = 0;
}
int32_t iCol = 0, jCol = 0;
SSchema *qColumn = NULL;
for (;;) {
qColumn = NULL;
if (jCol >= pSchema->nCols) break;
qColumn = &pSchema->pSchema[jCol];
if (!pColumn && (strcmp(qColumn->name, pAlterTbReq->colName) == 0)) {
pColumn = qColumn;
iCol = jCol;
if (rowLen < 0) break;
}
rowLen += qColumn->bytes;
++jCol;
}
entry.version = version;
int tlen;
SSchema *pNewSchema = NULL;
SSchema tScheam;
switch (pAlterTbReq->action) {
case TSDB_ALTER_TABLE_ADD_COLUMN:
case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION:
if (pColumn) {
terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS;
goto _err;
}
if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
goto _err;
}
if (rowLen + pAlterTbReq->bytes > TSDB_MAX_BYTES_PER_ROW) {
terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
goto _err;
}
pSchema->version++;
pSchema->nCols++;
pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols);
if (pNewSchema == NULL) {
goto _err;
}
memcpy(pNewSchema, pSchema->pSchema, sizeof(SSchema) * (pSchema->nCols - 1));
pSchema->pSchema = pNewSchema;
pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].bytes = pAlterTbReq->bytes;
pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type = pAlterTbReq->type;
pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].flags = pAlterTbReq->flags;
pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId = entry.ntbEntry.ncid++;
tstrncpy(pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].name, pAlterTbReq->colName,
strlen(pAlterTbReq->colName) + 1);
++pMeta->pVnode->config.vndStats.numOfNTimeSeries;
metaTimeSeriesNotifyCheck(pMeta);
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
int16_t cid = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId;
int8_t col_type = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type;
int32_t ret = tsdbCacheNewNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type);
if (ret < 0) {
terrno = ret;
goto _err;
}
}
SSchema *pCol = &pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1];
uint32_t compress = pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN ? createDefaultColCmprByType(pCol->type)
: pAlterTbReq->compress;
if (updataTableColCmpr(&entry.colCmpr, pCol, 1, compress) != 0) {
metaError("vgId:%d, failed to update table col cmpr:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name,
entry.uid);
}
freeColCmpr = true;
if (entry.colCmpr.nCols != pSchema->nCols) {
if (pNewSchema) taosMemoryFree(pNewSchema);
if (freeColCmpr) taosMemoryFree(entry.colCmpr.pColCmpr);
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err;
}
break;
case TSDB_ALTER_TABLE_DROP_COLUMN:
if (pColumn == NULL) {
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
goto _err;
}
if (pColumn->colId == 0) {
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err;
}
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
goto _err;
}
bool hasPrimayKey = false;
if (pSchema->nCols >= 2) {
hasPrimayKey = pSchema->pSchema[1].flags & COL_IS_KEY ? true : false;
}
memcpy(&tScheam, pColumn, sizeof(SSchema));
pSchema->version++;
tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema);
if (tlen) {
memmove(pColumn, pColumn + 1, tlen);
}
pSchema->nCols--;
--pMeta->pVnode->config.vndStats.numOfNTimeSeries;
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
int16_t cid = pColumn->colId;
if (tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, hasPrimayKey) != 0) {
metaError("vgId:%d, failed to drop ntable column:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name,
entry.uid);
}
}
if (updataTableColCmpr(&entry.colCmpr, &tScheam, 0, 0) != 0) {
metaError("vgId:%d, failed to update table col cmpr:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name,
entry.uid);
}
if (entry.colCmpr.nCols != pSchema->nCols) {
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err;
}
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
if (pColumn == NULL) {
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
goto _err;
}
if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes >= pAlterTbReq->colModBytes) {
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err;
}
if (rowLen + pAlterTbReq->colModBytes - pColumn->bytes > TSDB_MAX_BYTES_PER_ROW) {
terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
goto _err;
}
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
goto _err;
}
pSchema->version++;
pColumn->bytes = pAlterTbReq->colModBytes;
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
if (pAlterTbReq->colNewName == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
goto _err;
}
if (pColumn == NULL) {
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
goto _err;
}
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
goto _err;
}
pSchema->version++;
tstrncpy(pColumn->name, pAlterTbReq->colNewName, strlen(pAlterTbReq->colNewName) + 1);
break;
}
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
tsdbCacheInvalidateSchema(pMeta->pVnode->pTsdb, 0, entry.uid, pSchema->version);
}
entry.version = version;
// do actual write
metaWLock(pMeta);
if (metaDeleteNcolIdx(pMeta, &oldEntry) < 0) {
metaError("vgId:%d, failed to delete ncol idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
if (metaUpdateNcolIdx(pMeta, &entry) < 0) {
metaError("vgId:%d, failed to update ncol idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
// save to table db
if (metaSaveToTbDb(pMeta, &entry) < 0) {
metaError("vgId:%d, failed to save to tb db:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
if (metaUpdateUidIdx(pMeta, &entry) < 0) {
metaError("vgId:%d, failed to update uid idx:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
if (metaSaveToSkmDb(pMeta, &entry) < 0) {
metaError("vgId:%d, failed to save to skm db:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
if (metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs) < 0) {
metaError("vgId:%d, failed to update change time:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
metaULock(pMeta);
if (metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp) < 0) {
metaError("vgId:%d, failed to update meta rsp:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
for (int32_t i = 0; i < entry.colCmpr.nCols; i++) {
SColCmpr *p = &entry.colCmpr.pColCmpr[i];
pMetaRsp->pSchemaExt[i].colId = p->id;
pMetaRsp->pSchemaExt[i].compress = p->alg;
}
if (entry.pBuf) taosMemoryFree(entry.pBuf);
if (pNewSchema) taosMemoryFree(pNewSchema);
if (freeColCmpr) taosMemoryFree(entry.colCmpr.pColCmpr);
tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc);
tDecoderClear(&dc);
return 0;
_err:
if (entry.pBuf) taosMemoryFree(entry.pBuf);
tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc);
tDecoderClear(&dc);
return terrno != 0 ? terrno : TSDB_CODE_FAILED;
#endif
TAOS_RETURN(code);
#if 0
SSchema *pCol = &pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1];
uint32_t compress = pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN ? createDefaultColCmprByType(pCol->type)
: pAlterTbReq->compress;
if (updataTableColCmpr(&entry.colCmpr, pCol, 1, compress) != 0) {
metaError("vgId:%d, failed to update table col cmpr:%s uid:%" PRId64, TD_VID(pMeta->pVnode), entry.name, entry.uid);
}
freeColCmpr = true;
if (entry.colCmpr.nCols != pSchema->nCols) {
if (pNewSchema) taosMemoryFree(pNewSchema);
if (freeColCmpr) taosMemoryFree(entry.colCmpr.pColCmpr);
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err;
}
break;
#endif
}
int32_t metaDropTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) {
int32_t code = TSDB_CODE_SUCCESS;
// TODO
return code;
}
int32_t metaAlterTableColumnName(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, STableMetaRsp *pRsp) {