Merge pull request #12190 from taosdata/feature/vnode_refact1

fix: data loss in automatically created tables
This commit is contained in:
Hongze Cheng 2022-05-07 11:52:54 +08:00 committed by GitHub
commit d1a1ba4afc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 163 additions and 21 deletions

View File

@ -672,7 +672,6 @@ typedef struct {
SArray* pArray; // Array of SUseDbRsp SArray* pArray; // Array of SUseDbRsp
} SUseDbBatchRsp; } SUseDbBatchRsp;
int32_t tSerializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp); int32_t tSerializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp);
int32_t tDeserializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp); int32_t tDeserializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp);
void tFreeSUseDbBatchRsp(SUseDbBatchRsp* pRsp); void tFreeSUseDbBatchRsp(SUseDbBatchRsp* pRsp);
@ -685,7 +684,6 @@ int32_t tSerializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp
int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp); int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp);
void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp); void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp);
typedef struct { typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
} SCompactDbReq; } SCompactDbReq;
@ -1554,7 +1552,9 @@ typedef struct SVDropStbReq {
int32_t tEncodeSVDropStbReq(SCoder* pCoder, const SVDropStbReq* pReq); int32_t tEncodeSVDropStbReq(SCoder* pCoder, const SVDropStbReq* pReq);
int32_t tDecodeSVDropStbReq(SCoder* pCoder, SVDropStbReq* pReq); int32_t tDecodeSVDropStbReq(SCoder* pCoder, SVDropStbReq* pReq);
#define TD_CREATE_IF_NOT_EXISTS 0x1
typedef struct SVCreateTbReq { typedef struct SVCreateTbReq {
int32_t flags;
tb_uid_t uid; tb_uid_t uid;
int64_t ctime; int64_t ctime;
const char* name; const char* name;

View File

@ -1255,7 +1255,6 @@ int32_t tDeserializeSGetUserAuthRspImpl(SCoder *pDecoder, SGetUserAuthRsp *pRsp)
return 0; return 0;
} }
int32_t tDeserializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pRsp) { int32_t tDeserializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pRsp) {
SCoder decoder = {0}; SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
@ -2091,7 +2090,7 @@ void tFreeSUseDbBatchRsp(SUseDbBatchRsp *pRsp) {
taosArrayDestroy(pRsp->pArray); taosArrayDestroy(pRsp->pArray);
} }
int32_t tSerializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp){ int32_t tSerializeSUserAuthBatchRsp(void *buf, int32_t bufLen, SUserAuthBatchRsp *pRsp) {
SCoder encoder = {0}; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
@ -2110,7 +2109,7 @@ int32_t tSerializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp
return tlen; return tlen;
} }
int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp){ int32_t tDeserializeSUserAuthBatchRsp(void *buf, int32_t bufLen, SUserAuthBatchRsp *pRsp) {
SCoder decoder = {0}; SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
@ -2136,7 +2135,7 @@ int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchR
return 0; return 0;
} }
void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp){ void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp *pRsp) {
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
for (int32_t i = 0; i < numOfBatch; ++i) { for (int32_t i = 0; i < numOfBatch; ++i) {
SGetUserAuthRsp *pUserAuthRsp = taosArrayGet(pRsp->pArray, i); SGetUserAuthRsp *pUserAuthRsp = taosArrayGet(pRsp->pArray, i);
@ -2146,7 +2145,6 @@ void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp){
taosArrayDestroy(pRsp->pArray); taosArrayDestroy(pRsp->pArray);
} }
int32_t tSerializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) { int32_t tSerializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) {
SCoder encoder = {0}; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
@ -3746,6 +3744,7 @@ STSchema *tdGetSTSChemaFromSSChema(SSchema **pSchema, int32_t nCols) {
int tEncodeSVCreateTbReq(SCoder *pCoder, const SVCreateTbReq *pReq) { int tEncodeSVCreateTbReq(SCoder *pCoder, const SVCreateTbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1; if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32v(pCoder, pReq->flags) < 0) return -1;
if (tEncodeI64(pCoder, pReq->uid) < 0) return -1; if (tEncodeI64(pCoder, pReq->uid) < 0) return -1;
if (tEncodeI64(pCoder, pReq->ctime) < 0) return -1; if (tEncodeI64(pCoder, pReq->ctime) < 0) return -1;
@ -3771,6 +3770,7 @@ int tDecodeSVCreateTbReq(SCoder *pCoder, SVCreateTbReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1; if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->uid) < 0) return -1; if (tDecodeI64(pCoder, &pReq->uid) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->ctime) < 0) return -1; if (tDecodeI64(pCoder, &pReq->ctime) < 0) return -1;

View File

@ -196,7 +196,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
int8_t type; int8_t type;
int64_t ctime; int64_t ctime;
tb_uid_t suid; tb_uid_t suid;
int c, ret; int c = 0, ret;
// search & delete the name idx // search & delete the name idx
tdbDbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn); tdbDbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn);

View File

@ -15,10 +15,11 @@
#include "tsdb.h" #include "tsdb.h"
typedef struct SMemTable SMemTable; typedef struct SMemTable SMemTable;
typedef struct SMemData SMemData; typedef struct SMemData SMemData;
typedef struct SMemSkipList SMemSkipList; typedef struct SMemSkipList SMemSkipList;
typedef struct SMemSkipListCfg SMemSkipListCfg; typedef struct SMemSkipListNode SMemSkipListNode;
typedef struct SMemSkipListCurosr SMemSkipListCurosr;
struct SMemTable { struct SMemTable {
STsdb *pTsdb; STsdb *pTsdb;
@ -32,15 +33,17 @@ struct SMemTable {
SMemData **pBuckets; SMemData **pBuckets;
}; };
struct SMemSkipListCfg { struct SMemSkipListNode {
int8_t maxLevel; int8_t level;
int32_t nKey; SMemSkipListNode *forwards[];
int32_t nData;
}; };
struct SMemSkipList { struct SMemSkipList {
int8_t level; uint32_t seed;
uint32_t seed; int8_t maxLevel;
int8_t level;
int32_t size;
SMemSkipListNode pHead[];
}; };
struct SMemData { struct SMemData {
@ -55,6 +58,20 @@ struct SMemData {
SMemSkipList sl; SMemSkipList sl;
}; };
struct SMemSkipListCurosr {
SMemSkipList *pSl;
SMemSkipListNode *pNodeC;
};
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
#define SL_HEAD_NODE(sl) ((sl)->pHead)
#define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel))
// SMemTable // SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) { int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
SMemTable *pMemTb = NULL; SMemTable *pMemTb = NULL;
@ -76,6 +93,7 @@ int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
pMemTb->pBuckets = taosMemoryCalloc(pMemTb->nBucket, sizeof(*pMemTb->pBuckets)); pMemTb->pBuckets = taosMemoryCalloc(pMemTb->nBucket, sizeof(*pMemTb->pBuckets));
if (pMemTb->pBuckets == NULL) { if (pMemTb->pBuckets == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pMemTb);
return -1; return -1;
} }
@ -83,8 +101,121 @@ int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
return 0; return 0;
} }
int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMT) { int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) {
// TODO if (pMemTb) {
// loop to destroy the contents (todo)
taosMemoryFree(pMemTb->pBuckets);
taosMemoryFree(pMemTb);
}
return 0;
}
int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) {
SMemData *pMemData;
STsdb *pTsdb = pMemTb->pTsdb;
SVnode *pVnode = pTsdb->pVnode;
SVBufPool *pPool = pVnode->inUse;
int32_t hash;
int32_t tlen;
uint8_t buf[16];
int32_t rlen;
const uint8_t *p;
SMemSkipListNode *pSlNode;
const STSRow *pTSRow;
SMemSkipListCurosr slc = {0};
// search hash
hash = (pSubmitBlk->suid + pSubmitBlk->uid) % pMemTb->nBucket;
for (pMemData = pMemTb->pBuckets[hash]; pMemData; pMemData = pMemData->pHashNext) {
if (pMemData->suid == pSubmitBlk->suid && pMemData->uid == pSubmitBlk->uid) break;
}
// create pMemData if need
if (pMemData == NULL) {
int8_t maxLevel = pVnode->config.tsdbCfg.slLevel;
int32_t tsize = sizeof(*pMemData) + SL_NODE_HALF_SIZE(maxLevel) * 2;
SMemSkipListNode *pHead, *pTail;
pMemData = vnodeBufPoolMalloc(pPool, tsize);
if (pMemData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMemData->pHashNext = NULL;
pMemData->suid = pSubmitBlk->suid;
pMemData->uid = pSubmitBlk->uid;
pMemData->minKey = TSKEY_MAX;
pMemData->maxKey = TSKEY_MIN;
pMemData->minVer = -1;
pMemData->maxVer = -1;
pMemData->nRows = 0;
pMemData->sl.seed = taosRand();
pMemData->sl.maxLevel = maxLevel;
pMemData->sl.level = 0;
pMemData->sl.size = 0;
pHead = SL_HEAD_NODE(&pMemData->sl);
pTail = SL_TAIL_NODE(&pMemData->sl);
pHead->level = maxLevel;
pTail->level = maxLevel;
for (int iLevel = 0; iLevel < maxLevel; iLevel++) {
SL_NODE_FORWARD(pHead, iLevel) = pTail;
SL_NODE_FORWARD(pTail, iLevel) = pHead;
}
// add to MemTable
hash = (pMemData->suid + pMemData->uid) % pMemTb->nBucket;
pMemData->pHashNext = pMemTb->pBuckets[hash];
pMemTb->pBuckets[hash] = pMemData;
pMemTb->nHash++;
}
// loop to insert data to skiplist
#if 0
tsdbMemSkipListCursorOpen(&slc, &pMemData->sl);
p = pSubmitBlk->pData;
for (;;) {
if (p - (uint8_t *)pSubmitBlk->pData >= pSubmitBlk->nData) break;
const uint8_t *pt = p;
p = tGetBinary(p, &pTSRow, &rlen);
// check the row (todo)
// move the cursor to position to write (todo)
int32_t c;
tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c);
ASSERT(c);
// encode row
int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt);
pSlNode = vnodeBufPoolMalloc(pPool, tsize);
pSlNode->level = level;
uint8_t *pData = SL_NODE_DATA(pSlNode);
*(int64_t *)pData = version;
pData += sizeof(version);
memcpy(pData, pt, p - pt);
// insert row
tsdbMemSkipListCursorPut(&slc, pSlNode);
// update status
if (pTSRow->ts < pMemData->minKey) pMemData->minKey = pTSRow->ts;
if (pTSRow->ts > pMemData->maxKey) pMemData->maxKey = pTSRow->ts;
}
tsdbMemSkipListCursorClose(&slc);
#endif
if (pMemData->minVer == -1) pMemData->minVer = version;
if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version;
if (pMemTb->minKey < pMemData->minKey) pMemTb->minKey = pMemData->minKey;
if (pMemTb->maxKey < pMemData->maxKey) pMemTb->maxKey = pMemData->maxKey;
if (pMemTb->minVer == -1) pMemTb->minVer = version;
if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version;
return 0; return 0;
} }

View File

@ -360,7 +360,11 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
// do create table // do create table
if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) { if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
cRsp.code = terrno; if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
cRsp.code = TSDB_CODE_SUCCESS;
} else {
cRsp.code = terrno;
}
} else { } else {
cRsp.code = TSDB_CODE_SUCCESS; cRsp.code = TSDB_CODE_SUCCESS;
tsdbFetchTbUidList(pVnode->pTsdb, &pStore, pCreateReq->ctb.suid, pCreateReq->uid); tsdbFetchTbUidList(pVnode->pTsdb, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
@ -529,6 +533,13 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
} }
} }
msgIter.uid = createTbReq.uid;
if (createTbReq.type == TSDB_CHILD_TABLE) {
msgIter.suid = createTbReq.ctb.suid;
} else {
msgIter.suid = 0;
}
tCoderClear(&coder); tCoderClear(&coder);
} }