more
This commit is contained in:
parent
3f2fe1f645
commit
1aeb5a540d
|
@ -46,6 +46,7 @@ void tTSchemaDestroy(STSchema *pTSchema);
|
|||
#define COL_VAL_NULL(CID) ((SColVal){.cid = (CID), .isNull = 1})
|
||||
#define COL_VAL_VALUE(CID, V) ((SColVal){.cid = (CID), .value = (V)})
|
||||
|
||||
int32_t tTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow2 **ppRow);
|
||||
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow);
|
||||
void tTSRowFree(STSRow2 *pRow);
|
||||
void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
|
||||
|
@ -66,7 +67,7 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
|
|||
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
|
||||
void tTagFree(STag *pTag);
|
||||
bool tTagGet(const STag *pTag, STagVal *pTagVal);
|
||||
char* tTagValToData(const STagVal *pTagVal, bool isJson);
|
||||
char *tTagValToData(const STagVal *pTagVal, bool isJson);
|
||||
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
|
||||
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
|
||||
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
|
||||
|
@ -153,7 +154,7 @@ struct STagVal {
|
|||
};
|
||||
int8_t type;
|
||||
union {
|
||||
int64_t i64;
|
||||
int64_t i64;
|
||||
struct {
|
||||
uint32_t nData;
|
||||
uint8_t *pData;
|
||||
|
@ -161,7 +162,7 @@ struct STagVal {
|
|||
};
|
||||
};
|
||||
|
||||
#define TD_TAG_JSON ((int8_t)0x40) // distinguish JSON string and JSON value with the highest bit
|
||||
#define TD_TAG_JSON ((int8_t)0x40) // distinguish JSON string and JSON value with the highest bit
|
||||
#define TD_TAG_LARGE ((int8_t)0x20)
|
||||
struct STag {
|
||||
int8_t flags;
|
||||
|
@ -421,4 +422,3 @@ int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToM
|
|||
#endif
|
||||
|
||||
#endif /*_TD_COMMON_DATA_FORMAT_H_*/
|
||||
|
||||
|
|
|
@ -953,25 +953,6 @@ static void debugPrintTagVal(int8_t type, const void *val, int32_t vlen, const c
|
|||
}
|
||||
}
|
||||
|
||||
// if (isLarge) {
|
||||
// p = (uint8_t *)&((int16_t *)pTag->idx)[pTag->nTag];
|
||||
// } else {
|
||||
// p = (uint8_t *)&pTag->idx[pTag->nTag];
|
||||
// }
|
||||
|
||||
// (*ppArray) = taosArrayInit(pTag->nTag + 1, sizeof(STagVal));
|
||||
// if (*ppArray == NULL) {
|
||||
// code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
// goto _err;
|
||||
// }
|
||||
|
||||
// for (int16_t iTag = 0; iTag < pTag->nTag; iTag++) {
|
||||
// if (isLarge) {
|
||||
// offset = ((int16_t *)pTag->idx)[iTag];
|
||||
// } else {
|
||||
// offset = pTag->idx[iTag];
|
||||
// }
|
||||
|
||||
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln) {
|
||||
int8_t isJson = pTag->flags & TD_TAG_JSON;
|
||||
int8_t isLarge = pTag->flags & TD_TAG_LARGE;
|
||||
|
|
|
@ -55,14 +55,10 @@ struct SMemTable {
|
|||
#define SL_MAX_LEVEL 5
|
||||
|
||||
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
|
||||
#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
|
||||
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
|
||||
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
|
||||
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
|
||||
|
||||
#define SL_HEAD_FORWARD(sl, l) SL_NODE_FORWARD((sl)->pHead, l)
|
||||
#define SL_TAIL_BACKWARD(sl, l) SL_NODE_FORWARD((sl)->pTail, l)
|
||||
|
||||
static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData);
|
||||
static int memDataPCmprFn(const void *p1, const void *p2);
|
||||
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
|
||||
|
@ -224,7 +220,7 @@ static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_ui
|
|||
}
|
||||
|
||||
// create
|
||||
pMemData = vnodeBufPoolMalloc(pPool, sizeof(*pMemData) + SL_NODE_HALF_SIZE(maxLevel) * 2);
|
||||
pMemData = vnodeBufPoolMalloc(pPool, sizeof(*pMemData) + SL_NODE_SIZE(maxLevel) * 2);
|
||||
if (pMemData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
|
@ -239,11 +235,15 @@ static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_ui
|
|||
pMemData->sl.maxLevel = maxLevel;
|
||||
pMemData->sl.level = 0;
|
||||
pMemData->sl.pHead = (SMemSkipListNode *)&pMemData[1];
|
||||
pMemData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pMemData->sl.pHead, SL_NODE_HALF_SIZE(maxLevel));
|
||||
pMemData->sl.pTail = (SMemSkipListNode *)POINTER_SHIFT(pMemData->sl.pHead, SL_NODE_SIZE(maxLevel));
|
||||
pMemData->sl.pHead->level = maxLevel;
|
||||
pMemData->sl.pTail->level = maxLevel;
|
||||
|
||||
for (int8_t iLevel = 0; iLevel < pMemData->sl.maxLevel; iLevel++) {
|
||||
SL_HEAD_FORWARD(&pMemData->sl, iLevel) = pMemData->sl.pTail;
|
||||
SL_TAIL_BACKWARD(&pMemData->sl, iLevel) = pMemData->sl.pHead;
|
||||
SL_NODE_FORWARD(pMemData->sl.pHead, iLevel) = pMemData->sl.pTail;
|
||||
SL_NODE_BACKWARD(pMemData->sl.pHead, iLevel) = NULL;
|
||||
SL_NODE_BACKWARD(pMemData->sl.pTail, iLevel) = pMemData->sl.pHead;
|
||||
SL_NODE_FORWARD(pMemData->sl.pTail, iLevel) = NULL;
|
||||
}
|
||||
|
||||
if (idx < 0) idx = 0;
|
||||
|
@ -315,30 +315,12 @@ static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward,
|
|||
int c;
|
||||
|
||||
if (isForward) {
|
||||
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) {
|
||||
if (iLevel < pMemData->sl.level) {
|
||||
SMemSkipListNode *px = pos[iLevel];
|
||||
SMemSkipListNode *p = SL_NODE_FORWARD(px, iLevel);
|
||||
|
||||
while (p != pMemData->sl.pTail) {
|
||||
pKey = (TSDBKEY *)SL_NODE_DATA(p);
|
||||
|
||||
c = tsdbKeyCmprFn(pKey, pRow);
|
||||
if (c >= 0) {
|
||||
break;
|
||||
} else {
|
||||
px = p;
|
||||
p = SL_NODE_FORWARD(px, iLevel);
|
||||
}
|
||||
}
|
||||
|
||||
pos[iLevel] = px;
|
||||
}
|
||||
}
|
||||
// TODO
|
||||
} else {
|
||||
SMemSkipListNode *px = pMemData->sl.pTail;
|
||||
|
||||
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) {
|
||||
if (iLevel < pMemData->sl.level) {
|
||||
SMemSkipListNode *px = pos[iLevel];
|
||||
SMemSkipListNode *p = SL_NODE_BACKWARD(px, iLevel);
|
||||
|
||||
while (p != pMemData->sl.pHead) {
|
||||
|
@ -359,6 +341,36 @@ static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward,
|
|||
}
|
||||
}
|
||||
|
||||
static void memMovePosFrom(SMemData *pMemData, SMemSkipListNode *pNode, TSDBROW *pRow, int8_t isForward,
|
||||
SMemSkipListNode **pos) {
|
||||
SMemSkipListNode *px = pNode;
|
||||
TSDBKEY *pKey;
|
||||
SMemSkipListNode *p;
|
||||
int c;
|
||||
|
||||
if (isForward) {
|
||||
} else {
|
||||
ASSERT(pNode != pMemData->sl.pHead);
|
||||
|
||||
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) {
|
||||
p = SL_NODE_BACKWARD(px, iLevel);
|
||||
while (p != pMemData->sl.pHead) {
|
||||
pKey = (TSDBKEY *)SL_NODE_DATA(p);
|
||||
|
||||
c = tsdbKeyCmprFn(pKey, pRow);
|
||||
if (c <= 0) {
|
||||
break;
|
||||
} else {
|
||||
px = p;
|
||||
p = SL_NODE_BACKWARD(px, iLevel);
|
||||
}
|
||||
}
|
||||
|
||||
pos[iLevel] = px;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow, int8_t isForward,
|
||||
SMemSkipListNode **pos) {
|
||||
int32_t code = 0;
|
||||
|
@ -381,128 +393,4 @@ static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow
|
|||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
#if 0 //====================================================================================
|
||||
|
||||
|
||||
// SMemTable ========================
|
||||
int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) {
|
||||
SMemData *pMemData;
|
||||
STsdb *pTsdb = pMemTb->pTsdb;
|
||||
SVnode *pVnode = pTsdb->pVnode;
|
||||
SVBufPool *pPool = pVnode->inUse;
|
||||
tb_uid_t suid = pSubmitBlk->suid;
|
||||
tb_uid_t uid = pSubmitBlk->uid;
|
||||
int32_t iBucket;
|
||||
|
||||
// search SMemData by hash
|
||||
iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
|
||||
for (pMemData = pMemTb->pBuckets[iBucket]; pMemData; pMemData = pMemData->pHashNext) {
|
||||
if (pMemData->suid == suid && pMemData->uid == uid) break;
|
||||
}
|
||||
|
||||
// create pMemData if need
|
||||
if (pMemData == NULL) {
|
||||
int8_t maxLevel = pVnode->config.tsdbCfg.slLevel;
|
||||
int32_t tsize = sizeof(*pMemData) + SL_NODE_HALF_SIZE(maxLevel) * 2;
|
||||
SMemSkipListNode *pHead, *pTail;
|
||||
|
||||
pMemData = vnodeBufPoolMalloc(pPool, tsize);
|
||||
if (pMemData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pMemData->pHashNext = NULL;
|
||||
pMemData->suid = suid;
|
||||
pMemData->uid = uid;
|
||||
pMemData->minKey = TSKEY_MAX;
|
||||
pMemData->maxKey = TSKEY_MIN;
|
||||
pMemData->minVer = -1;
|
||||
pMemData->maxVer = -1;
|
||||
pMemData->nRows = 0;
|
||||
pMemData->sl.seed = taosRand();
|
||||
pMemData->sl.maxLevel = maxLevel;
|
||||
pMemData->sl.level = 0;
|
||||
pMemData->sl.size = 0;
|
||||
pHead = SL_HEAD_NODE(&pMemData->sl);
|
||||
pTail = SL_TAIL_NODE(&pMemData->sl);
|
||||
pHead->level = maxLevel;
|
||||
pTail->level = maxLevel;
|
||||
for (int iLevel = 0; iLevel < maxLevel; iLevel++) {
|
||||
SL_HEAD_NODE_FORWARD(pHead, iLevel) = pTail;
|
||||
SL_TAIL_NODE_BACKWARD(pTail, iLevel) = pHead;
|
||||
}
|
||||
|
||||
// add to hash
|
||||
if (pMemTb->nHash >= pMemTb->nBucket) {
|
||||
// rehash (todo)
|
||||
}
|
||||
iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
|
||||
pMemData->pHashNext = pMemTb->pBuckets[iBucket];
|
||||
pMemTb->pBuckets[iBucket] = pMemData;
|
||||
pMemTb->nHash++;
|
||||
|
||||
// sort organize (todo)
|
||||
}
|
||||
|
||||
// do insert data to SMemData
|
||||
SMemSkipListNode *forwards[SL_MAX_LEVEL];
|
||||
SMemSkipListNode *pNode;
|
||||
int32_t iRow;
|
||||
STsdbRow tRow = {.version = version};
|
||||
SEncoder ec = {0};
|
||||
SDecoder dc = {0};
|
||||
|
||||
tDecoderInit(&dc, pSubmitBlk->pData, pSubmitBlk->nData);
|
||||
tsdbMemSkipListCursorInit(pMemTb->pSlc, &pMemData->sl);
|
||||
for (iRow = 0;; iRow++) {
|
||||
if (tDecodeIsEnd(&dc)) break;
|
||||
|
||||
// decode row
|
||||
if (tDecodeBinary(&dc, (uint8_t **)&tRow.pRow, &tRow.szRow) < 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// move cursor
|
||||
tsdbMemSkipListCursorMoveTo(pMemTb->pSlc, version, tRow.pRow->ts, 0);
|
||||
|
||||
// encode row
|
||||
pNode = tsdbMemSkipListNodeCreate(pPool, &pMemData->sl, &tRow);
|
||||
if (pNode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// put the node
|
||||
tsdbMemSkipListCursorPut(pMemTb->pSlc, pNode);
|
||||
|
||||
// update status
|
||||
if (tRow.pRow->ts < pMemData->minKey) pMemData->minKey = tRow.pRow->ts;
|
||||
if (tRow.pRow->ts > pMemData->maxKey) pMemData->maxKey = tRow.pRow->ts;
|
||||
}
|
||||
tDecoderClear(&dc);
|
||||
|
||||
// update status
|
||||
if (pMemData->minVer == -1) pMemData->minVer = version;
|
||||
if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version;
|
||||
|
||||
if (pMemTb->minKey < pMemData->minKey) pMemTb->minKey = pMemData->minKey;
|
||||
if (pMemTb->maxKey < pMemData->maxKey) pMemTb->maxKey = pMemData->maxKey;
|
||||
if (pMemTb->minVer == -1) pMemTb->minVer = version;
|
||||
if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl) {
|
||||
SMemSkipListNode *pHead = SL_HEAD_NODE(pSl);
|
||||
pSlc->pSl = pSl;
|
||||
// for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
|
||||
// pSlc->forwards[iLevel] = pHead;
|
||||
// }
|
||||
}
|
||||
|
||||
#endif
|
||||
}
|
Loading…
Reference in New Issue