refactor: do some internal refactor.
This commit is contained in:
parent
cb26dd9fa2
commit
f582705feb
|
@ -186,7 +186,6 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
|
|||
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
|
||||
|
||||
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
|
||||
void colDataTrim(SColumnInfoData* pColumnInfoData);
|
||||
|
||||
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
|
||||
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
|
||||
|
@ -206,7 +205,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
|
|||
size_t blockDataGetSerialMetaSize(uint32_t numOfCols);
|
||||
|
||||
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
||||
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
|
||||
|
||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
|
||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
||||
|
@ -235,11 +233,10 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColIn
|
|||
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
|
||||
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);
|
||||
|
||||
int32_t blockGetEncodeSize(const SSDataBlock* pBlock);
|
||||
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
|
||||
const char* blockDecode(SSDataBlock* pBlock, const char* pData);
|
||||
|
||||
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||
void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
|
||||
// for debug
|
||||
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
|
||||
|
||||
|
@ -248,9 +245,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData
|
|||
|
||||
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
|
||||
|
||||
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
|
||||
}
|
||||
void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRo
|
|||
}
|
||||
}
|
||||
|
||||
int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
|
||||
static int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
|
||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||
return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
|
||||
} else {
|
||||
|
@ -42,10 +42,6 @@ int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t num
|
|||
}
|
||||
}
|
||||
|
||||
void colDataTrim(SColumnInfoData* pColumnInfoData) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
int32_t getJsonValueLen(const char* data) {
|
||||
int32_t dataLen = 0;
|
||||
if (*data == TSDB_DATA_TYPE_NULL) {
|
||||
|
@ -820,41 +816,8 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock,
|
||||
int32_t tupleIndex) {
|
||||
int32_t code = 0;
|
||||
size_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock);
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pDst = &pDstCols[i];
|
||||
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, i);
|
||||
|
||||
if (pSrc->hasNull && colDataIsNull(pSrc, pSrcBlock->info.rows, tupleIndex, pSrcBlock->pBlockAgg[i])) {
|
||||
code = colDataSetVal(pDst, numOfRows, NULL, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
char* p = colDataGetData(pSrc, tupleIndex);
|
||||
code = colDataSetVal(pDst, numOfRows, p, false);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) {
|
||||
#if 0
|
||||
for (int32_t i = 0; i < pDataBlock->info.rows; ++i) {
|
||||
int32_t code = doAssignOneTuple(pCols, i, pDataBlock, index[i]);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
#else
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pDst = &pCols[i];
|
||||
|
@ -879,7 +842,7 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB
|
|||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1039,114 +1002,6 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
typedef struct SHelper {
|
||||
int32_t index;
|
||||
union {
|
||||
char* pData;
|
||||
int64_t i64;
|
||||
double d64;
|
||||
};
|
||||
} SHelper;
|
||||
|
||||
SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* pBlock) {
|
||||
int32_t sortValLengthPerRow = 0;
|
||||
int32_t numOfCols = taosArrayGetSize(pOrderInfo);
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->slotId);
|
||||
pInfo->pColData = pColInfo;
|
||||
sortValLengthPerRow += pColInfo->info.bytes;
|
||||
}
|
||||
|
||||
size_t len = sortValLengthPerRow * pBlock->info.rows;
|
||||
|
||||
char* buf = taosMemoryCalloc(1, len);
|
||||
SHelper* phelper = taosMemoryCalloc(numOfRows, sizeof(SHelper));
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
phelper[i].index = i;
|
||||
phelper[i].pData = buf + sortValLengthPerRow * i;
|
||||
}
|
||||
|
||||
int32_t offset = 0;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
|
||||
for (int32_t j = 0; j < numOfRows; ++j) {
|
||||
phelper[j].i64 = *(int32_t*)pInfo->pColData->pData + pInfo->pColData->info.bytes * j;
|
||||
// memcpy(phelper[j].pData + offset, pInfo->pColData->pData + pInfo->pColData->info.bytes * j,
|
||||
// pInfo->pColData->info.bytes);
|
||||
}
|
||||
|
||||
offset += pInfo->pColData->info.bytes;
|
||||
}
|
||||
|
||||
taosMemoryFree(buf);
|
||||
return phelper;
|
||||
}
|
||||
|
||||
int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) {
|
||||
const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param;
|
||||
|
||||
SHelper* left = (SHelper*)p1;
|
||||
SHelper* right = (SHelper*)p2;
|
||||
|
||||
SArray* pInfo = pHelper->orderInfo;
|
||||
|
||||
int32_t offset = 0;
|
||||
int32_t leftx = *(int32_t*)left->pData; //*(int32_t*)(left->pData + offset);
|
||||
int32_t rightx = *(int32_t*)right->pData; //*(int32_t*)(right->pData + offset);
|
||||
|
||||
if (leftx == rightx) {
|
||||
return 0;
|
||||
} else {
|
||||
return (leftx < rightx) ? -1 : 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) {
|
||||
// Allocate the additional buffer.
|
||||
int64_t p0 = taosGetTimestampUs();
|
||||
|
||||
SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
|
||||
|
||||
uint32_t rows = pDataBlock->info.rows;
|
||||
SHelper* index = createTupleIndex_rv(rows, helper.orderInfo, pDataBlock);
|
||||
if (index == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
taosqsort(index, rows, sizeof(SHelper), &helper, dataBlockCompar_rv);
|
||||
|
||||
int64_t p1 = taosGetTimestampUs();
|
||||
SColumnInfoData* pCols = createHelpColInfoData(pDataBlock);
|
||||
if (pCols == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int64_t p2 = taosGetTimestampUs();
|
||||
|
||||
// int32_t code = blockDataAssign(pCols, pDataBlock, index);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// terrno = code;
|
||||
// return code;
|
||||
// }
|
||||
|
||||
int64_t p3 = taosGetTimestampUs();
|
||||
|
||||
copyBackToBlock(pDataBlock, pCols);
|
||||
int64_t p4 = taosGetTimestampUs();
|
||||
|
||||
printf("sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1,
|
||||
p3 - p2, p4 - p3, rows);
|
||||
// destroyTupleIndex(index);
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
void blockDataCleanup(SSDataBlock* pDataBlock) {
|
||||
blockDataEmpty(pDataBlock);
|
||||
SDataBlockInfo* pInfo = &pDataBlock->info;
|
||||
|
@ -1299,6 +1154,7 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// todo remove it
|
||||
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
||||
dst->info = src->info;
|
||||
dst->info.rows = 0;
|
||||
|
@ -1679,16 +1535,6 @@ static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_
|
|||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||
pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
|
||||
memset(&pColInfoData->varmeta.offset[n], 0, total - n);
|
||||
} else { // reset the bitmap value
|
||||
/*int32_t stopIndex = BitmapLen(n) * 8;
|
||||
for(int32_t i = n; i < stopIndex; ++i) {
|
||||
colDataClearNull_f(pColInfoData->nullbitmap, i);
|
||||
}
|
||||
|
||||
int32_t remain = BitmapLen(total) - BitmapLen(n);
|
||||
if (remain > 0) {
|
||||
memset(pColInfoData->nullbitmap+BitmapLen(n), 0, remain);
|
||||
}*/
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1782,32 +1628,6 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
|
|||
return (void*)buf;
|
||||
}
|
||||
|
||||
int32_t tEncodeDataBlocks(void** buf, const SArray* blocks) {
|
||||
int32_t tlen = 0;
|
||||
int32_t sz = taosArrayGetSize(blocks);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSDataBlock* pBlock = taosArrayGet(blocks, i);
|
||||
tlen += tEncodeDataBlock(buf, pBlock);
|
||||
}
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void* tDecodeDataBlocks(const void* buf, SArray** blocks) {
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
|
||||
*blocks = taosArrayInit(sz, sizeof(SSDataBlock));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSDataBlock pBlock = {0};
|
||||
buf = tDecodeDataBlock(buf, &pBlock);
|
||||
taosArrayPush(*blocks, &pBlock);
|
||||
}
|
||||
return (void*)buf;
|
||||
}
|
||||
|
||||
static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
||||
time_t tt;
|
||||
int32_t ms = 0;
|
||||
|
@ -2060,182 +1880,6 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
return dumpBuf;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief TODO: Assume that the final generated result it less than 3M
|
||||
*
|
||||
* @param pReq
|
||||
* @param pDataBlocks
|
||||
* @param vgId
|
||||
* @param suid
|
||||
*
|
||||
*/
|
||||
#if 0
|
||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId,
|
||||
tb_uid_t suid) {
|
||||
int32_t bufSize = sizeof(SSubmitReq);
|
||||
int32_t sz = 1;
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
const SDataBlockInfo* pBlkInfo = &pDataBlock->info;
|
||||
|
||||
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(colNum));
|
||||
bufSize += sizeof(SSubmitBlk);
|
||||
}
|
||||
|
||||
*pReq = taosMemoryCalloc(1, bufSize);
|
||||
if (!(*pReq)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
void* pDataBuf = *pReq;
|
||||
|
||||
int32_t msgLen = sizeof(SSubmitReq);
|
||||
int32_t numOfBlks = 0;
|
||||
SRowBuilder rb = {0};
|
||||
tdSRowInit(&rb, pTSchema->version);
|
||||
|
||||
for (int32_t i = 0; i < sz; ++i) {
|
||||
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
|
||||
if (colNum <= 1) {
|
||||
// invalid if only with TS col
|
||||
continue;
|
||||
}
|
||||
|
||||
if (rb.nCols != colNum) {
|
||||
tdSRowSetTpInfo(&rb, colNum, pTSchema->flen);
|
||||
}
|
||||
|
||||
SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen);
|
||||
pSubmitBlk->suid = suid;
|
||||
pSubmitBlk->uid = pDataBlock->info.id.groupId;
|
||||
pSubmitBlk->numOfRows = rows;
|
||||
pSubmitBlk->sversion = pTSchema->version;
|
||||
|
||||
msgLen += sizeof(SSubmitBlk);
|
||||
int32_t dataLen = 0;
|
||||
for (int32_t j = 0; j < rows; ++j) { // iterate by row
|
||||
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen + dataLen)); // set row buf
|
||||
bool isStartKey = false;
|
||||
int32_t offset = 0;
|
||||
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
STColumn* pCol = &pTSchema->columns[k];
|
||||
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
||||
switch (pColInfoData->info.type) {
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
if (!isStartKey) {
|
||||
isStartKey = true;
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true,
|
||||
offset, k);
|
||||
continue; // offset should keep 0 for next column
|
||||
|
||||
} else if (colDataIsNull_s(pColInfoData, j)) {
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NULL, NULL,
|
||||
false, offset, k);
|
||||
} else {
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var,
|
||||
true, offset, k);
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_VARCHAR: // TSDB_DATA_TYPE_BINARY
|
||||
case TSDB_DATA_TYPE_GEOMETRY: {
|
||||
if (colDataIsNull_s(pColInfoData, j)) {
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, TD_VTYPE_NULL, NULL,
|
||||
false, offset, k);
|
||||
} else {
|
||||
void* data = colDataGetData(pColInfoData, j);
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, TD_VTYPE_NORM, data,
|
||||
true, offset, k);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
||||
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
|
||||
break;
|
||||
default:
|
||||
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
|
||||
if (colDataIsNull_s(pColInfoData, j)) {
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NULL, NULL, false,
|
||||
offset, k);
|
||||
} else if (pCol->type == pColInfoData->info.type) {
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, var, true, offset,
|
||||
k);
|
||||
} else {
|
||||
char tv[8] = {0};
|
||||
if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
|
||||
float v = 0;
|
||||
GET_TYPED_DATA(v, float, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
} else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
|
||||
double v = 0;
|
||||
GET_TYPED_DATA(v, double, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
} else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
|
||||
int64_t v = 0;
|
||||
GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
} else {
|
||||
uint64_t v = 0;
|
||||
GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
|
||||
SET_TYPED_DATA(&tv, pCol->type, v);
|
||||
}
|
||||
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset,
|
||||
k);
|
||||
}
|
||||
} else {
|
||||
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
|
||||
}
|
||||
break;
|
||||
}
|
||||
offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation
|
||||
}
|
||||
tdSRowEnd(&rb);
|
||||
dataLen += TD_ROW_LEN(rb.pBuf);
|
||||
#ifdef TD_DEBUG_PRINT_ROW
|
||||
tdSRowPrint(rb.pBuf, pTSchema, __func__);
|
||||
#endif
|
||||
}
|
||||
|
||||
++numOfBlks;
|
||||
|
||||
pSubmitBlk->dataLen = dataLen;
|
||||
msgLen += pSubmitBlk->dataLen;
|
||||
}
|
||||
|
||||
if (numOfBlks > 0) {
|
||||
(*pReq)->length = msgLen;
|
||||
|
||||
(*pReq)->header.vgId = htonl(vgId);
|
||||
(*pReq)->header.contLen = htonl(msgLen);
|
||||
(*pReq)->length = (*pReq)->header.contLen;
|
||||
(*pReq)->numOfBlocks = htonl(numOfBlks);
|
||||
SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1);
|
||||
while (numOfBlks--) {
|
||||
int32_t dataLen = blk->dataLen;
|
||||
blk->uid = htobe64(blk->uid);
|
||||
blk->suid = htobe64(blk->suid);
|
||||
blk->sversion = htonl(blk->sversion);
|
||||
blk->dataLen = htonl(blk->dataLen);
|
||||
blk->schemaLen = htonl(blk->schemaLen);
|
||||
blk->numOfRows = htonl(blk->numOfRows);
|
||||
blk = (SSubmitBlk*)(blk->data + dataLen);
|
||||
}
|
||||
} else {
|
||||
// no valid rows
|
||||
taosMemoryFreeClear(*pReq);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema,
|
||||
int64_t uid, int32_t vgId, tb_uid_t suid) {
|
||||
SSubmitReq2* pReq = *ppReq;
|
||||
|
@ -2610,3 +2254,149 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
|
|||
ASSERT(pStart - pData == dataLen);
|
||||
return pStart;
|
||||
}
|
||||
|
||||
void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
|
||||
// int32_t totalRows = pBlock->info.rows;
|
||||
int32_t bmLen = BitmapLen(totalRows);
|
||||
char* pBitmap = NULL;
|
||||
int32_t maxRows = 0;
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
|
||||
// it is a reserved column for scalar function, and no data in this column yet.
|
||||
if (pDst->pData == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t numOfRows = 0;
|
||||
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
|
||||
int32_t j = 0;
|
||||
pDst->varmeta.length = 0;
|
||||
|
||||
while (j < totalRows) {
|
||||
if (pBoolList[j] == 0) {
|
||||
j += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (colDataIsNull_var(pDst, j)) {
|
||||
colDataSetNull_var(pDst, numOfRows);
|
||||
} else {
|
||||
// fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first copy it to p2
|
||||
char* p1 = colDataGetVarData(pDst, j);
|
||||
int32_t len = 0;
|
||||
if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
len = getJsonValueLen(p1);
|
||||
} else {
|
||||
len = varDataTLen(p1);
|
||||
}
|
||||
char* p2 = taosMemoryMalloc(len);
|
||||
memcpy(p2, p1, len);
|
||||
colDataSetVal(pDst, numOfRows, p2, false);
|
||||
taosMemoryFree(p2);
|
||||
}
|
||||
numOfRows += 1;
|
||||
j += 1;
|
||||
}
|
||||
|
||||
if (maxRows < numOfRows) {
|
||||
maxRows = numOfRows;
|
||||
}
|
||||
} else {
|
||||
if (pBitmap == NULL) {
|
||||
pBitmap = taosMemoryCalloc(1, bmLen);
|
||||
}
|
||||
|
||||
memcpy(pBitmap, pDst->nullbitmap, bmLen);
|
||||
memset(pDst->nullbitmap, 0, bmLen);
|
||||
|
||||
int32_t j = 0;
|
||||
|
||||
switch (pDst->info.type) {
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
while (j < totalRows) {
|
||||
if (pBoolList[j] == 0) {
|
||||
j += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (colDataIsNull_f(pBitmap, j)) {
|
||||
colDataSetNull_f(pDst->nullbitmap, numOfRows);
|
||||
} else {
|
||||
((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
|
||||
}
|
||||
numOfRows += 1;
|
||||
j += 1;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
while (j < totalRows) {
|
||||
if (pBoolList[j] == 0) {
|
||||
j += 1;
|
||||
continue;
|
||||
}
|
||||
if (colDataIsNull_f(pBitmap, j)) {
|
||||
colDataSetNull_f(pDst->nullbitmap, numOfRows);
|
||||
} else {
|
||||
((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
|
||||
}
|
||||
numOfRows += 1;
|
||||
j += 1;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
while (j < totalRows) {
|
||||
if (pBoolList[j] == 0) {
|
||||
j += 1;
|
||||
continue;
|
||||
}
|
||||
if (colDataIsNull_f(pBitmap, j)) {
|
||||
colDataSetNull_f(pDst->nullbitmap, numOfRows);
|
||||
} else {
|
||||
((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
|
||||
}
|
||||
numOfRows += 1;
|
||||
j += 1;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
while (j < totalRows) {
|
||||
if (pBoolList[j] == 0) {
|
||||
j += 1;
|
||||
continue;
|
||||
}
|
||||
if (colDataIsNull_f(pBitmap, j)) {
|
||||
colDataSetNull_f(pDst->nullbitmap, numOfRows);
|
||||
} else {
|
||||
((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
|
||||
}
|
||||
numOfRows += 1;
|
||||
j += 1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (maxRows < numOfRows) {
|
||||
maxRows = numOfRows;
|
||||
}
|
||||
}
|
||||
|
||||
pBlock->info.rows = maxRows;
|
||||
if (pBitmap != NULL) {
|
||||
taosMemoryFree(pBitmap);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
|
||||
}
|
|
@ -540,151 +540,12 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD
|
|||
}
|
||||
|
||||
int8_t* pIndicator = (int8_t*)p->pData;
|
||||
int32_t totalRows = pBlock->info.rows;
|
||||
|
||||
if (status == FILTER_RESULT_ALL_QUALIFIED) {
|
||||
// here nothing needs to be done
|
||||
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
|
||||
pBlock->info.rows = 0;
|
||||
} else {
|
||||
int32_t bmLen = BitmapLen(totalRows);
|
||||
char* pBitmap = NULL;
|
||||
int32_t maxRows = 0;
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
|
||||
// it is a reserved column for scalar function, and no data in this column yet.
|
||||
if (pDst->pData == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t numOfRows = 0;
|
||||
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
|
||||
int32_t j = 0;
|
||||
pDst->varmeta.length = 0;
|
||||
|
||||
while (j < totalRows) {
|
||||
if (pIndicator[j] == 0) {
|
||||
j += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (colDataIsNull_var(pDst, j)) {
|
||||
colDataSetNull_var(pDst, numOfRows);
|
||||
} else {
|
||||
// fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first copy it to p2
|
||||
char* p1 = colDataGetVarData(pDst, j);
|
||||
int32_t len = 0;
|
||||
if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
len = getJsonValueLen(p1);
|
||||
} else {
|
||||
len = varDataTLen(p1);
|
||||
}
|
||||
char* p2 = taosMemoryMalloc(len);
|
||||
memcpy(p2, p1, len);
|
||||
colDataSetVal(pDst, numOfRows, p2, false);
|
||||
taosMemoryFree(p2);
|
||||
}
|
||||
numOfRows += 1;
|
||||
j += 1;
|
||||
}
|
||||
|
||||
if (maxRows < numOfRows) {
|
||||
maxRows = numOfRows;
|
||||
}
|
||||
} else {
|
||||
if (pBitmap == NULL) {
|
||||
pBitmap = taosMemoryCalloc(1, bmLen);
|
||||
}
|
||||
|
||||
memcpy(pBitmap, pDst->nullbitmap, bmLen);
|
||||
memset(pDst->nullbitmap, 0, bmLen);
|
||||
|
||||
int32_t j = 0;
|
||||
|
||||
switch (pDst->info.type) {
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
while (j < totalRows) {
|
||||
if (pIndicator[j] == 0) {
|
||||
j += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (colDataIsNull_f(pBitmap, j)) {
|
||||
colDataSetNull_f(pDst->nullbitmap, numOfRows);
|
||||
} else {
|
||||
((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
|
||||
}
|
||||
numOfRows += 1;
|
||||
j += 1;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
while (j < totalRows) {
|
||||
if (pIndicator[j] == 0) {
|
||||
j += 1;
|
||||
continue;
|
||||
}
|
||||
if (colDataIsNull_f(pBitmap, j)) {
|
||||
colDataSetNull_f(pDst->nullbitmap, numOfRows);
|
||||
} else {
|
||||
((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
|
||||
}
|
||||
numOfRows += 1;
|
||||
j += 1;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
while (j < totalRows) {
|
||||
if (pIndicator[j] == 0) {
|
||||
j += 1;
|
||||
continue;
|
||||
}
|
||||
if (colDataIsNull_f(pBitmap, j)) {
|
||||
colDataSetNull_f(pDst->nullbitmap, numOfRows);
|
||||
} else {
|
||||
((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
|
||||
}
|
||||
numOfRows += 1;
|
||||
j += 1;
|
||||
}
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
while (j < totalRows) {
|
||||
if (pIndicator[j] == 0) {
|
||||
j += 1;
|
||||
continue;
|
||||
}
|
||||
if (colDataIsNull_f(pBitmap, j)) {
|
||||
colDataSetNull_f(pDst->nullbitmap, numOfRows);
|
||||
} else {
|
||||
((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
|
||||
}
|
||||
numOfRows += 1;
|
||||
j += 1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (maxRows < numOfRows) {
|
||||
maxRows = numOfRows;
|
||||
}
|
||||
}
|
||||
|
||||
pBlock->info.rows = maxRows;
|
||||
if (pBitmap != NULL) {
|
||||
taosMemoryFree(pBitmap);
|
||||
}
|
||||
trimDataBlock(pBlock, pBlock->info.rows, (bool*) pIndicator);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -485,12 +485,12 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
}
|
||||
|
||||
int32_t code = 0;
|
||||
bool freeReader = false;
|
||||
|
||||
// backup the rows
|
||||
int32_t backupRows = pBlock->info.rows;
|
||||
pBlock->info.rows = rows;
|
||||
|
||||
bool freeReader = false;
|
||||
STableCachedVal val = {0};
|
||||
|
||||
SMetaReader mr = {0};
|
||||
|
@ -1553,13 +1553,13 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
|
||||
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
|
||||
|
||||
pInfo->pRes->info.rows = pBlock->info.rows;
|
||||
pInfo->pRes->info.id.uid = pBlock->info.id.uid;
|
||||
pInfo->pRes->info.type = STREAM_NORMAL;
|
||||
pInfo->pRes->info.version = pBlock->info.version;
|
||||
pBlockInfo->rows = pBlock->info.rows;
|
||||
pBlockInfo->id.uid = pBlock->info.id.uid;
|
||||
pBlockInfo->type = STREAM_NORMAL;
|
||||
pBlockInfo->version = pBlock->info.version;
|
||||
|
||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||
pBlockInfo->id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||
|
||||
// todo extract method
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
||||
|
@ -1589,7 +1589,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
// currently only the tbname pseudo column
|
||||
if (pInfo->numOfPseudoExpr > 0) {
|
||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache);
|
||||
pBlockInfo->rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache);
|
||||
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||
|
@ -1606,7 +1606,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
|
||||
pInfo->pRes->info.dataLoad = 1;
|
||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||
// blockDataFreeRes((SSDataBlock*)pBlock);
|
||||
|
||||
calBlockTbName(pInfo, pInfo->pRes);
|
||||
return 0;
|
||||
|
@ -2088,11 +2087,26 @@ FETCH_NEXT_BLOCK:
|
|||
return pInfo->pCreateTbRes;
|
||||
}
|
||||
|
||||
// todo apply time window range filter
|
||||
|
||||
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
|
||||
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
|
||||
{ // do additional time window filter
|
||||
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
|
||||
|
||||
if (pWindow->skey != 0) {
|
||||
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
|
||||
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
int64_t* ts = (int64_t*) colDataGetData(pCol, i);
|
||||
p[i] = (*ts >= pWindow->skey);
|
||||
}
|
||||
|
||||
trimDataBlock(pBlock, pBlock->info.rows, p);
|
||||
taosMemoryFree(p);
|
||||
}
|
||||
}
|
||||
|
||||
pBlock->info.dataLoad = 1;
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
|
||||
|
||||
|
|
Loading…
Reference in New Issue