diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 8f8691cfc2..29af2bda67 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -61,14 +61,14 @@ struct SVBufPoolNode { }; struct SVBufPool { - SVBufPool* next; - SVnode* pVnode; - volatile int32_t nRef; - TdThreadSpinlock lock; - int64_t size; - uint8_t* ptr; - SVBufPoolNode* pTail; - SVBufPoolNode node; + SVBufPool* next; + SVnode* pVnode; + TdThreadSpinlock* lock; + volatile int32_t nRef; + int64_t size; + uint8_t* ptr; + SVBufPoolNode* pTail; + SVBufPoolNode node; }; int32_t vnodeOpenBufPool(SVnode* pVnode); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index fed02b9d99..654afe1b6a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -458,11 +458,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p } static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { - int8_t level = 1; - int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1); - const uint32_t factor = 4; + int8_t level = 1; + int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1); - while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) { + while ((taosRandR(&pSl->seed) & 0x3) == 0 && level < tlevel) { level++; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index a40b1667c5..755a551e20 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1015,6 +1015,191 @@ _err: return code; } +static int32_t tBlockDataAppendBlockRow(SBlockData *pBlockData, SBlockData *pBlockDataFrom, int32_t iRow) { + int32_t code = 0; + + SColVal cv = {0}; + int32_t iColDataFrom = 0; + SColData *pColDataFrom = + (iColDataFrom < pBlockDataFrom->nColData) ? &((SColData *)pBlockDataFrom->aColData->pData)[iColDataFrom] : NULL; + + for (int32_t iColDataTo = 0; iColDataTo < pBlockData->nColData; iColDataTo++) { + SColData *pColDataTo = &((SColData *)pBlockData->aColData->pData)[iColDataTo]; + + while (pColDataFrom && pColDataFrom->cid < pColDataTo->cid) { + iColDataFrom++; + pColDataFrom = (iColDataFrom < pBlockDataFrom->nColData) + ? &((SColData *)pBlockDataFrom->aColData->pData)[iColDataFrom] + : NULL; + } + + if (pColDataFrom == NULL || pColDataFrom->cid > pColDataTo->cid) { + code = tColDataAppendValue(pColDataTo, &COL_VAL_NONE(pColDataTo->cid, pColDataTo->type)); + if (code) goto _exit; + } else { + tColDataGetValue(pColDataFrom, iRow, &cv); + + code = tColDataAppendValue(pColDataTo, &cv); + if (code) goto _exit; + + iColDataFrom++; + pColDataFrom = (iColDataFrom < pBlockDataFrom->nColData) + ? &((SColData *)pBlockDataFrom->aColData->pData)[iColDataFrom] + : NULL; + } + } + +_exit: + return code; +} + +static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSchema *pTSchema) { + int32_t code = 0; + + int32_t iTColumn = 1; + STColumn *pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; + void *pBitmap = pRow->statis ? tdGetBitmapAddrTp(pRow, pTSchema->flen) : NULL; + + for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { + SColData *pColData = &((SColData *)pBlockData->aColData->pData)[iColData]; + + while (pTColumn && pTColumn->colId < pColData->cid) { + iTColumn++; + pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; + } + + if (pTColumn == NULL || pTColumn->colId > pColData->cid) { + code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); + if (code) goto _exit; + } else { + ASSERT(pTColumn->type == pColData->type); + + SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type}; + + if (pRow->statis) { + TDRowValT vt = TD_VTYPE_MAX; + tdGetBitmapValTypeII(pBitmap, iTColumn - 1, &vt); + + if (vt == TD_VTYPE_NORM) { + cv.flag = CV_FLAG_VALUE; + + if (IS_VAR_DATA_TYPE(pTColumn->type)) { + void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); + cv.value.nData = varDataLen(pData); + cv.value.pData = varDataVal(pData); + } else { + memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes); + } + + code = tColDataAppendValue(pColData, &cv); + if (code) goto _exit; + } else if (vt == TD_VTYPE_NONE) { + code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); + if (code) goto _exit; + } else if (vt == TD_VTYPE_NULL) { + code = tColDataAppendValue(pColData, &COL_VAL_NULL(pColData->cid, pColData->type)); + if (code) goto _exit; + } else { + ASSERT(0); + } + } else { + cv.flag = CV_FLAG_VALUE; + + if (IS_VAR_DATA_TYPE(pTColumn->type)) { + void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); + cv.value.nData = varDataLen(pData); + cv.value.pData = varDataVal(pData); + } else { + memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes); + } + + code = tColDataAppendValue(pColData, &cv); + if (code) goto _exit; + } + + iTColumn++; + pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; + } + } + +_exit: + return code; +} + +static int32_t tBlockDataAppendKVRow(SBlockData *pBlockData, STSRow *pRow, STSchema *pTSchema) { + int32_t code = 0; + + col_id_t kvIter = 0; + col_id_t nKvCols = tdRowGetNCols(pRow) - 1; + void *pColIdx = TD_ROW_COL_IDX(pRow); + void *pBitmap = tdGetBitmapAddrKv(pRow, tdRowGetNCols(pRow)); + int32_t iTColumn = 1; + STColumn *pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; + + for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { + SColData *pColData = &((SColData *)pBlockData->aColData->pData)[iColData]; + + while (pTColumn && pTColumn->colId < pColData->cid) { + iTColumn++; + pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; + } + + if (pTColumn == NULL || pTColumn->colId > pColData->cid) { + code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); + if (code) goto _exit; + } else { + ASSERT(pTColumn->type == pColData->type); + + SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type}; + TDRowValT vt = TD_VTYPE_NONE; // default is NONE + SKvRowIdx *pKvIdx = NULL; + + while (kvIter < nKvCols) { + pKvIdx = (SKvRowIdx *)POINTER_SHIFT(pColIdx, kvIter * sizeof(SKvRowIdx)); + if (pKvIdx->colId == pTColumn->colId) { + tdGetBitmapValTypeII(pBitmap, kvIter, &vt); + ++kvIter; + break; + } else if (pKvIdx->colId > pTColumn->colId) { + vt = TD_VTYPE_NONE; + break; + } else { + ++kvIter; + } + } + + if (vt == TD_VTYPE_NORM) { + cv.flag = CV_FLAG_VALUE; + + void *pData = POINTER_SHIFT(pRow, pKvIdx->offset); + if (IS_VAR_DATA_TYPE(pTColumn->type)) { + cv.value.nData = varDataLen(pData); + cv.value.pData = varDataVal(pData); + } else { + memcpy(&cv.value.val, pData, pTColumn->bytes); + } + + code = tColDataAppendValue(pColData, &cv); + if (code) goto _exit; + } else if (vt == TD_VTYPE_NONE) { + code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); + if (code) goto _exit; + } else if (vt == TD_VTYPE_NULL) { + code = tColDataAppendValue(pColData, &COL_VAL_NULL(pColData->cid, pColData->type)); + if (code) goto _exit; + } else { + ASSERT(0); + } + + iTColumn++; + pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; + } + } + +_exit: + return code; +} + int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { int32_t code = 0; @@ -1036,27 +1221,20 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS if (code) goto _err; pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow); - // OTHER - SRowIter rIter = {0}; - SColVal *pColVal; - - tRowIterInit(&rIter, pRow, pTSchema); - pColVal = tRowIterNext(&rIter); - for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { - SColData *pColData = &((SColData *)pBlockData->aColData->pData)[iColData]; - - while (pColVal && pColVal->cid < pColData->cid) { - pColVal = tRowIterNext(&rIter); - } - - if (pColVal == NULL || pColVal->cid > pColData->cid) { - code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); + SColVal cv = {0}; + if (pRow->type == 0) { + if (TD_IS_TP_ROW(pRow->pTSRow)) { + code = tBlockDataAppendTPRow(pBlockData, pRow->pTSRow, pTSchema); + if (code) goto _err; + } else if (TD_IS_KV_ROW(pRow->pTSRow)) { + code = tBlockDataAppendKVRow(pBlockData, pRow->pTSRow, pTSchema); if (code) goto _err; } else { - code = tColDataAppendValue(pColData, pColVal); - if (code) goto _err; - pColVal = tRowIterNext(&rIter); + ASSERT(0); } + } else { + code = tBlockDataAppendBlockRow(pBlockData, pRow->pBlockData, pRow->iRow); + if (code) goto _err; } pBlockData->nRow++; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 6ac2ce1c16..71e926bd35 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -27,10 +27,21 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) return -1; } - if (taosThreadSpinInit(&pPool->lock, 0) != 0) { - taosMemoryFree(pPool); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + if (VND_IS_RSMA(pVnode)) { + pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock)); + if (!pPool->lock) { + taosMemoryFree(pPool); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (taosThreadSpinInit(pPool->lock, 0) != 0) { + taosMemoryFree((void*)pPool->lock); + taosMemoryFree(pPool); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } else { + pPool->lock = NULL; } pPool->next = NULL; @@ -49,7 +60,10 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) static int vnodeBufPoolDestroy(SVBufPool *pPool) { vnodeBufPoolReset(pPool); - taosThreadSpinDestroy(&pPool->lock); + if (pPool->lock) { + taosThreadSpinDestroy(pPool->lock); + taosMemoryFree((void*)pPool->lock); + } taosMemoryFree(pPool); return 0; } @@ -114,7 +128,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { void *p = NULL; ASSERT(pPool != NULL); - taosThreadSpinLock(&pPool->lock); + if (pPool->lock) taosThreadSpinLock(pPool->lock); if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { // allocate from the anchor node p = pPool->ptr; @@ -125,7 +139,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { pNode = taosMemoryMalloc(sizeof(*pNode) + size); if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - taosThreadSpinUnlock(&pPool->lock); + if (pPool->lock) taosThreadSpinUnlock(pPool->lock); return NULL; } @@ -138,7 +152,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { pPool->size = pPool->size + sizeof(*pNode) + size; } - taosThreadSpinUnlock(&pPool->lock); + if (pPool->lock) taosThreadSpinUnlock(pPool->lock); return p; }