feat: insert from query
This commit is contained in:
parent
0d4fb5bb80
commit
d625cc2e47
|
@ -246,8 +246,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
|||
|
||||
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
|
||||
|
||||
SSubmitReq* dataBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, int64_t uid, int64_t suid, int32_t vgId);
|
||||
|
||||
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
|
||||
}
|
||||
|
|
|
@ -665,6 +665,7 @@ typedef struct {
|
|||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t sversion;
|
||||
int32_t tversion;
|
||||
int64_t affectedRows;
|
||||
} SQueryTableRsp;
|
||||
|
||||
int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
|
||||
|
|
|
@ -100,7 +100,7 @@ void dsEndPut(DataSinkHandle handle, uint64_t useconds);
|
|||
* @param handle
|
||||
* @param pLen data length
|
||||
*/
|
||||
void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd);
|
||||
void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd);
|
||||
|
||||
/**
|
||||
* Get data, the caller needs to allocate data memory.
|
||||
|
|
|
@ -2106,75 +2106,3 @@ const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRow
|
|||
return pStart;
|
||||
}
|
||||
|
||||
|
||||
SSubmitReq* dataBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, int64_t uid, int64_t suid, int32_t vgId) {
|
||||
SSubmitReq* ret = NULL;
|
||||
int32_t sz = taosArrayGetSize(pBlocks);
|
||||
|
||||
// cal size
|
||||
int32_t cap = sizeof(SSubmitReq);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
// TODO min
|
||||
int32_t rowSize = pDataBlock->info.rowSize;
|
||||
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
|
||||
|
||||
cap += sizeof(SSubmitBlk) + rows * maxLen;
|
||||
}
|
||||
|
||||
// assign data
|
||||
// TODO
|
||||
ret = rpcMallocCont(cap);
|
||||
ret->header.vgId = vgId;
|
||||
ret->version = htonl(1);
|
||||
ret->length = sizeof(SSubmitReq);
|
||||
ret->numOfBlocks = htonl(sz);
|
||||
|
||||
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
|
||||
blkHead->numOfRows = htons(pDataBlock->info.rows);
|
||||
blkHead->sversion = htonl(pTSchema->version);
|
||||
// TODO
|
||||
blkHead->suid = htobe64(suid);
|
||||
blkHead->uid = htobe64(uid);
|
||||
blkHead->schemaLen = htonl(0);
|
||||
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
int32_t dataLen = 0;
|
||||
STSRow* rowData = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
SRowBuilder rb = {0};
|
||||
tdSRowInit(&rb, pTSchema->version);
|
||||
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
|
||||
tdSRowResetBuf(&rb, rowData);
|
||||
|
||||
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
||||
const STColumn* pColumn = &pTSchema->columns[k];
|
||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
if (colDataIsNull_s(pColData, j)) {
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k);
|
||||
} else {
|
||||
void* data = colDataGetData(pColData, j);
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
|
||||
}
|
||||
}
|
||||
int32_t rowLen = TD_ROW_LEN(rowData);
|
||||
rowData = POINTER_SHIFT(rowData, rowLen);
|
||||
dataLen += rowLen;
|
||||
}
|
||||
|
||||
blkHead->dataLen = htonl(dataLen);
|
||||
|
||||
ret->length += sizeof(SSubmitBlk) + dataLen;
|
||||
blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen);
|
||||
}
|
||||
|
||||
ret->length = htonl(ret->length);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -247,7 +247,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
case TDMT_SCH_FETCH:
|
||||
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||
case TDMT_SCH_FETCH_RSP:
|
||||
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
|
||||
return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||
case TDMT_SCH_CANCEL_TASK:
|
||||
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||
case TDMT_SCH_DROP_TASK:
|
||||
|
|
|
@ -136,6 +136,77 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs
|
|||
}
|
||||
|
||||
|
||||
SSubmitReq* dataBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, int64_t uid, int64_t suid, int32_t vgId) {
|
||||
SSubmitReq* ret = NULL;
|
||||
int32_t sz = taosArrayGetSize(pBlocks);
|
||||
|
||||
// cal size
|
||||
int32_t cap = sizeof(SSubmitReq);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
// TODO min
|
||||
int32_t rowSize = pDataBlock->info.rowSize;
|
||||
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
|
||||
|
||||
cap += sizeof(SSubmitBlk) + rows * maxLen;
|
||||
}
|
||||
|
||||
// assign data
|
||||
// TODO
|
||||
ret = rpcMallocCont(cap);
|
||||
ret->header.vgId = vgId;
|
||||
ret->version = htonl(1);
|
||||
ret->length = sizeof(SSubmitReq);
|
||||
ret->numOfBlocks = htonl(sz);
|
||||
|
||||
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
|
||||
blkHead->numOfRows = htons(pDataBlock->info.rows);
|
||||
blkHead->sversion = htonl(pTSchema->version);
|
||||
// TODO
|
||||
blkHead->suid = htobe64(suid);
|
||||
blkHead->uid = htobe64(uid);
|
||||
blkHead->schemaLen = htonl(0);
|
||||
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
int32_t dataLen = 0;
|
||||
STSRow* rowData = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
SRowBuilder rb = {0};
|
||||
tdSRowInit(&rb, pTSchema->version);
|
||||
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
|
||||
tdSRowResetBuf(&rb, rowData);
|
||||
|
||||
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
||||
const STColumn* pColumn = &pTSchema->columns[k];
|
||||
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||
if (colDataIsNull_s(pColData, j)) {
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k);
|
||||
} else {
|
||||
void* data = colDataGetData(pColData, j);
|
||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
|
||||
}
|
||||
}
|
||||
int32_t rowLen = TD_ROW_LEN(rowData);
|
||||
rowData = POINTER_SHIFT(rowData, rowLen);
|
||||
dataLen += rowLen;
|
||||
}
|
||||
|
||||
blkHead->dataLen = htonl(dataLen);
|
||||
|
||||
ret->length += sizeof(SSubmitBlk) + dataLen;
|
||||
blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen);
|
||||
}
|
||||
|
||||
ret->length = htonl(ret->length);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
|
||||
SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
|
||||
taosArrayPush(pInserter->pDataBlocks, pInput->pData);
|
||||
|
|
|
@ -56,7 +56,7 @@ void dsEndPut(DataSinkHandle handle, uint64_t useconds) {
|
|||
return pHandleImpl->fEndPut(pHandleImpl, useconds);
|
||||
}
|
||||
|
||||
void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
|
||||
void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd) {
|
||||
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||
pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd);
|
||||
}
|
||||
|
|
|
@ -139,6 +139,7 @@ typedef struct SQWTaskCtx {
|
|||
bool queryContinue;
|
||||
bool queryInQueue;
|
||||
int32_t rspCode;
|
||||
int64_t affectedRows; // for insert ...select stmt
|
||||
|
||||
SRpcHandleInfo ctrlConnInfo;
|
||||
SRpcHandleInfo dataConnInfo;
|
||||
|
|
|
@ -39,7 +39,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
|
|||
int32_t code);
|
||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
|
||||
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
|
||||
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
|
||||
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx);
|
||||
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
|
||||
void qwFreeFetchRsp(void *msg);
|
||||
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
|
||||
|
|
|
@ -43,13 +43,16 @@ void qwFreeFetchRsp(void *msg) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) {
|
||||
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
|
||||
STbVerInfo* tbInfo = ctx ? &ctx->tbInfo : NULL;
|
||||
int64_t affectedRows = ctx ? ctx->affectedRows : 0;
|
||||
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
|
||||
pRsp->code = code;
|
||||
pRsp->code = htonl(code);
|
||||
pRsp->affectedRows = htobe64(affectedRows);
|
||||
if (tbInfo) {
|
||||
strcpy(pRsp->tbFName, tbInfo->tbFName);
|
||||
pRsp->sversion = tbInfo->sversion;
|
||||
pRsp->tversion = tbInfo->tversion;
|
||||
pRsp->sversion = htonl(tbInfo->sversion);
|
||||
pRsp->tversion = htonl(tbInfo->tversion);
|
||||
}
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
|
|
|
@ -57,6 +57,10 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
|||
connInfo.ahandle = NULL;
|
||||
QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
|
||||
}
|
||||
|
||||
if (!ctx->needFetch) {
|
||||
dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -184,7 +188,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
|
|||
}
|
||||
|
||||
int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
|
||||
int32_t len = 0;
|
||||
int64_t len = 0;
|
||||
SRetrieveTableRsp *rsp = NULL;
|
||||
bool queryEnd = false;
|
||||
int32_t code = 0;
|
||||
|
@ -243,7 +247,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
|||
}
|
||||
|
||||
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SDeleteRes *pRes) {
|
||||
int32_t len = 0;
|
||||
int64_t len = 0;
|
||||
SVDeleteRsp rsp = {0};
|
||||
bool queryEnd = false;
|
||||
int32_t code = 0;
|
||||
|
@ -445,7 +449,7 @@ _return:
|
|||
}
|
||||
|
||||
if (rspConnection) {
|
||||
qwBuildAndSendQueryRsp(input->msgType + 1, rspConnection, code, ctx ? &ctx->tbInfo : NULL);
|
||||
qwBuildAndSendQueryRsp(input->msgType + 1, rspConnection, code, ctx);
|
||||
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code));
|
||||
}
|
||||
|
||||
|
|
|
@ -331,7 +331,7 @@ void qwtEndPut(DataSinkHandle handle, uint64_t useconds) {
|
|||
qwtTestSinkQueryEnd = true;
|
||||
}
|
||||
|
||||
void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
|
||||
void qwtGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd) {
|
||||
static int32_t in = 0;
|
||||
|
||||
if (in > 0) {
|
||||
|
|
Loading…
Reference in New Issue