feat: insert from query
This commit is contained in:
parent
f3c8bcb953
commit
99d80ddc90
|
@ -38,6 +38,7 @@ typedef struct SDataInserterHandle {
|
||||||
SSubmitRes submitRes;
|
SSubmitRes submitRes;
|
||||||
SInserterParam* pParam;
|
SInserterParam* pParam;
|
||||||
SArray* pDataBlocks;
|
SArray* pDataBlocks;
|
||||||
|
SHashObj* pCols;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
bool queryEnd;
|
bool queryEnd;
|
||||||
uint64_t useconds;
|
uint64_t useconds;
|
||||||
|
@ -124,6 +125,7 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) {
|
||||||
int64_t uid = pInserter->pNode->tableId;
|
int64_t uid = pInserter->pNode->tableId;
|
||||||
int64_t suid = pInserter->pNode->stableId;
|
int64_t suid = pInserter->pNode->stableId;
|
||||||
int32_t vgId = pInserter->pNode->vgId;
|
int32_t vgId = pInserter->pNode->vgId;
|
||||||
|
bool fullCol = (pInserter->pNode->pCols->length == pTSchema->numOfCols);
|
||||||
|
|
||||||
SSubmitReq* ret = NULL;
|
SSubmitReq* ret = NULL;
|
||||||
int32_t sz = taosArrayGetSize(pBlocks);
|
int32_t sz = taosArrayGetSize(pBlocks);
|
||||||
|
@ -144,7 +146,7 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) {
|
||||||
// TODO
|
// TODO
|
||||||
ret = rpcMallocCont(cap);
|
ret = rpcMallocCont(cap);
|
||||||
ret->header.vgId = vgId;
|
ret->header.vgId = vgId;
|
||||||
ret->version = htonl(1);
|
ret->version = htonl(pTSchema->version);
|
||||||
ret->length = sizeof(SSubmitReq);
|
ret->length = sizeof(SSubmitReq);
|
||||||
ret->numOfBlocks = htonl(sz);
|
ret->numOfBlocks = htonl(sz);
|
||||||
|
|
||||||
|
@ -170,9 +172,20 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) {
|
||||||
|
|
||||||
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
||||||
const STColumn* pColumn = &pTSchema->columns[k];
|
const STColumn* pColumn = &pTSchema->columns[k];
|
||||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
|
SColumnInfoData* pColData = NULL;
|
||||||
|
int16_t colIdx = k;
|
||||||
|
if (!fullCol) {
|
||||||
|
int16_t *slotId = taosHashGet(pInserter->pCols, &pColumn->colId, sizeof(pColumn->colId));
|
||||||
|
if (NULL == slotId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
colIdx = *slotId;
|
||||||
|
}
|
||||||
|
|
||||||
|
pColData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
|
||||||
if (colDataIsNull_s(pColData, j)) {
|
if (colDataIsNull_s(pColData, j)) {
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k);
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
|
||||||
} else {
|
} else {
|
||||||
void* data = colDataGetData(pColData, j);
|
void* data = colDataGetData(pColData, j);
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
|
||||||
|
@ -285,6 +298,13 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
|
||||||
|
SNode* pNode = NULL;
|
||||||
|
FOREACH(pNode, pInserterNode->pCols) {
|
||||||
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
|
taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId));
|
||||||
|
}
|
||||||
|
|
||||||
tsem_init(&inserter->ready, 0, 0);
|
tsem_init(&inserter->ready, 0, 0);
|
||||||
|
|
||||||
*pHandle = inserter;
|
*pHandle = inserter;
|
||||||
|
|
Loading…
Reference in New Issue