Merge branch 'refact/submit_req_taosx' into refact/submit_req
This commit is contained in:
commit
4dbc324564
|
@ -146,6 +146,9 @@ extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max,
|
||||||
int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind);
|
int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind);
|
||||||
void tColDataSortMerge(SArray *colDataArr);
|
void tColDataSortMerge(SArray *colDataArr);
|
||||||
|
|
||||||
|
//for raw block
|
||||||
|
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes,
|
||||||
|
int32_t nRows, char* lengthOrbitmap, char *data);
|
||||||
// for encode/decode
|
// for encode/decode
|
||||||
int32_t tPutColData(uint8_t *pBuf, SColData *pColData);
|
int32_t tPutColData(uint8_t *pBuf, SColData *pColData);
|
||||||
int32_t tGetColData(uint8_t *pBuf, SColData *pColData);
|
int32_t tGetColData(uint8_t *pBuf, SColData *pColData);
|
||||||
|
|
|
@ -115,6 +115,8 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS
|
||||||
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
|
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
|
||||||
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
|
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
|
||||||
|
|
||||||
|
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, SRetrieveTableRsp* pRsp, SVCreateTbReq* pCreateTb);
|
||||||
|
|
||||||
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
|
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
|
||||||
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);
|
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);
|
||||||
SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap);
|
SArray* serializeVgroupsDropTableBatch(SHashObj* pVgroupHashmap);
|
||||||
|
|
|
@ -1201,16 +1201,6 @@ end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SVgroupInfo vg;
|
|
||||||
void* data;
|
|
||||||
} VgData;
|
|
||||||
|
|
||||||
static void destroyVgHash(void* data) {
|
|
||||||
VgData* vgData = (VgData*)data;
|
|
||||||
taosMemoryFreeClear(vgData->data);
|
|
||||||
}
|
|
||||||
|
|
||||||
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD *fields, int numFields){
|
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD *fields, int numFields){
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
|
@ -1637,8 +1627,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
|
||||||
taosHashSetFreeFp(pVgHash, destroyVgHash);
|
|
||||||
struct SCatalog* pCatalog = NULL;
|
struct SCatalog* pCatalog = NULL;
|
||||||
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1652,6 +1640,13 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
conn.requestObjRefId = pRequest->self;
|
conn.requestObjRefId = pRequest->self;
|
||||||
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
|
pQuery = smlInitHandle();
|
||||||
|
if(pQuery == NULL){
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
|
||||||
printf("raw data block num:%d\n", rspObj.rsp.blockNum);
|
printf("raw data block num:%d\n", rspObj.rsp.blockNum);
|
||||||
while (++rspObj.resIter < rspObj.rsp.blockNum) {
|
while (++rspObj.resIter < rspObj.rsp.blockNum) {
|
||||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
|
||||||
|
@ -1659,14 +1654,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
uError("WriteRaw:no schema, iter:%d", rspObj.resIter);
|
uError("WriteRaw:no schema, iter:%d", rspObj.resIter);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
|
|
||||||
setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
|
|
||||||
|
|
||||||
code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
uError("WriteRaw: setQueryResultFromRsp error");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
|
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
|
||||||
if (!tbName) {
|
if (!tbName) {
|
||||||
|
@ -1680,13 +1667,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
strcpy(pName.dbname, pRequest->pDb);
|
strcpy(pName.dbname, pRequest->pDb);
|
||||||
strcpy(pName.tname, tbName);
|
strcpy(pName.tname, tbName);
|
||||||
|
|
||||||
VgData vgData = {0};
|
|
||||||
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg));
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
|
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
|
||||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||||
uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
|
uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
|
||||||
|
@ -1698,164 +1678,29 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint16_t fLen = 0;
|
SVgroupInfo vg;
|
||||||
int32_t rowSize = 0;
|
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
|
||||||
int16_t nVar = 0;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
|
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
|
||||||
SSchema* schema = &pTableMeta->schema[i];
|
|
||||||
fLen += TYPE_BYTES[schema->type];
|
|
||||||
rowSize += schema->bytes;
|
|
||||||
if (IS_VAR_DATA_TYPE(schema->type)) {
|
|
||||||
nVar++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fLen -= sizeof(TSKEY);
|
|
||||||
|
|
||||||
int32_t rows = rspObj.resInfo.numOfRows;
|
|
||||||
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
|
|
||||||
(int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1);
|
|
||||||
int32_t schemaLen = 0;
|
|
||||||
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
|
||||||
|
|
||||||
SSubmitReq* subReq = NULL;
|
|
||||||
SSubmitBlk* blk = NULL;
|
|
||||||
void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
|
|
||||||
if (hData) {
|
|
||||||
vgData = *(VgData*)hData;
|
|
||||||
|
|
||||||
int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
|
|
||||||
void* tmp = taosMemoryRealloc(vgData.data, totalLen);
|
|
||||||
if (tmp == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
vgData.data = tmp;
|
|
||||||
((VgData*)hData)->data = tmp;
|
|
||||||
subReq = (SSubmitReq*)(vgData.data);
|
|
||||||
blk = POINTER_SHIFT(vgData.data, subReq->length);
|
|
||||||
} else {
|
|
||||||
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
|
|
||||||
void* tmp = taosMemoryCalloc(1, totalLen);
|
|
||||||
if (tmp == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
vgData.data = tmp;
|
|
||||||
taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData));
|
|
||||||
subReq = (SSubmitReq*)(vgData.data);
|
|
||||||
subReq->length = sizeof(SSubmitReq);
|
|
||||||
subReq->numOfBlocks = 0;
|
|
||||||
|
|
||||||
blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq));
|
|
||||||
}
|
|
||||||
|
|
||||||
// pSW->pSchema should be same as pTableMeta->schema
|
|
||||||
// ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns);
|
|
||||||
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
|
|
||||||
uint64_t uid = pTableMeta->uid;
|
|
||||||
int16_t sver = pTableMeta->sversion;
|
|
||||||
|
|
||||||
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
|
|
||||||
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
|
|
||||||
|
|
||||||
SRowBuilder rb = {0};
|
|
||||||
tdSRowInit(&rb, sver);
|
|
||||||
tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen);
|
|
||||||
int32_t totalLen = 0;
|
|
||||||
|
|
||||||
SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
|
||||||
for (int i = 0; i < pSW->nCols; i++) {
|
|
||||||
SSchema* schema = &pSW->pSchema[i];
|
|
||||||
taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < rows; j++) {
|
|
||||||
tdSRowResetBuf(&rb, rowData);
|
|
||||||
|
|
||||||
doSetOneRowPtr(&rspObj.resInfo);
|
|
||||||
rspObj.resInfo.current += 1;
|
|
||||||
|
|
||||||
int32_t offset = 0;
|
|
||||||
for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) {
|
|
||||||
const SSchema* pColumn = &pTableMeta->schema[k];
|
|
||||||
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
|
|
||||||
if (!index) {
|
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
|
|
||||||
} else {
|
|
||||||
char* colData = rspObj.resInfo.row[*index];
|
|
||||||
if (!colData) {
|
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
|
||||||
} else {
|
|
||||||
if (IS_VAR_DATA_TYPE(pColumn->type)) {
|
|
||||||
colData -= VARSTR_HEADER_SIZE;
|
|
||||||
}
|
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
|
||||||
offset += TYPE_BYTES[pColumn->type];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tdSRowEnd(&rb);
|
|
||||||
int32_t rowLen = TD_ROW_LEN(rowData);
|
|
||||||
rowData = POINTER_SHIFT(rowData, rowLen);
|
|
||||||
totalLen += rowLen;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosHashCleanup(schemaHash);
|
|
||||||
blk->uid = htobe64(uid);
|
|
||||||
blk->suid = htobe64(suid);
|
|
||||||
blk->sversion = htonl(sver);
|
|
||||||
blk->schemaLen = htonl(schemaLen);
|
|
||||||
blk->numOfRows = htonl(rows);
|
|
||||||
blk->dataLen = htonl(totalLen);
|
|
||||||
subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen;
|
|
||||||
subReq->numOfBlocks++;
|
|
||||||
taosMemoryFreeClear(pTableMeta);
|
|
||||||
rspObj.resInfo.pRspMsg = NULL;
|
|
||||||
doFreeReqResultInfo(&rspObj.resInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
|
||||||
if (NULL == pQuery) {
|
|
||||||
uError("create SQuery error");
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
|
||||||
pQuery->haveResultSet = false;
|
|
||||||
pQuery->msgType = TDMT_VND_SUBMIT;
|
|
||||||
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
|
||||||
if (NULL == pQuery->pRoot) {
|
|
||||||
uError("create pQuery->pRoot error");
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
|
||||||
|
|
||||||
int32_t numOfVg = taosHashGetSize(pVgHash);
|
|
||||||
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
|
||||||
|
|
||||||
VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL);
|
|
||||||
while (vData) {
|
|
||||||
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
|
||||||
if (NULL == dst) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
dst->vg = vData->vg;
|
|
||||||
SSubmitReq* subReq = (SSubmitReq*)(vData->data);
|
void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId));
|
||||||
dst->numOfTables = subReq->numOfBlocks;
|
if (hData == NULL) {
|
||||||
dst->size = subReq->length;
|
taosHashPut(pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
|
||||||
dst->pData = (char*)subReq;
|
}
|
||||||
vData->data = NULL; // no need free
|
|
||||||
subReq->header.vgId = htonl(dst->vg.vgId);
|
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve, NULL);
|
||||||
subReq->version = htonl(1);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
subReq->header.contLen = htonl(subReq->length);
|
uError("WriteRaw:rawBlockBindData failed");
|
||||||
subReq->length = htonl(subReq->length);
|
goto end;
|
||||||
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
|
}
|
||||||
taosArrayPush(nodeStmt->pDataBlocks, &dst);
|
}
|
||||||
vData = (VgData*)taosHashIterate(pVgHash, vData);
|
|
||||||
|
code = smlBuildOutput(pQuery, pVgHash);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("smlBuildOutput failed");
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||||
|
@ -1863,8 +1708,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
|
|
||||||
end:
|
end:
|
||||||
tDeleteSMqDataRsp(&rspObj.rsp);
|
tDeleteSMqDataRsp(&rspObj.rsp);
|
||||||
rspObj.resInfo.pRspMsg = NULL;
|
|
||||||
doFreeReqResultInfo(&rspObj.resInfo);
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
qDestroyQuery(pQuery);
|
qDestroyQuery(pQuery);
|
||||||
destroyRequest(pRequest);
|
destroyRequest(pRequest);
|
||||||
|
@ -1907,8 +1750,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
|
||||||
taosHashSetFreeFp(pVgHash, destroyVgHash);
|
|
||||||
struct SCatalog* pCatalog = NULL;
|
struct SCatalog* pCatalog = NULL;
|
||||||
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1922,6 +1763,13 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
conn.requestObjRefId = pRequest->self;
|
conn.requestObjRefId = pRequest->self;
|
||||||
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
|
pQuery = smlInitHandle();
|
||||||
|
if(pQuery == NULL){
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
|
||||||
printf("raw data block num:%d\n", rspObj.rsp.blockNum);
|
printf("raw data block num:%d\n", rspObj.rsp.blockNum);
|
||||||
while (++rspObj.resIter < rspObj.rsp.blockNum) {
|
while (++rspObj.resIter < rspObj.rsp.blockNum) {
|
||||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter);
|
||||||
|
@ -1929,14 +1777,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
uError("WriteRaw:no schema, iter:%d", rspObj.resIter);
|
uError("WriteRaw:no schema, iter:%d", rspObj.resIter);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
|
|
||||||
setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
|
|
||||||
|
|
||||||
code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
uError("WriteRaw: setQueryResultFromRsp error");
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
|
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
|
||||||
if (!tbName) {
|
if (!tbName) {
|
||||||
|
@ -1950,44 +1790,28 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
strcpy(pName.dbname, pRequest->pDb);
|
strcpy(pName.dbname, pRequest->pDb);
|
||||||
strcpy(pName.tname, tbName);
|
strcpy(pName.tname, tbName);
|
||||||
|
|
||||||
VgData vgData = {0};
|
|
||||||
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg));
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
// find schema data info
|
// find schema data info
|
||||||
int32_t schemaLen = 0;
|
SVCreateTbReq pCreateReq = {0};
|
||||||
void* schemaData = NULL;
|
|
||||||
for (int j = 0; j < rspObj.rsp.createTableNum; j++) {
|
for (int j = 0; j < rspObj.rsp.createTableNum; j++) {
|
||||||
void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j);
|
void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j);
|
||||||
int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j);
|
int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j);
|
||||||
|
|
||||||
SDecoder decoderTmp = {0};
|
SDecoder decoderTmp = {0};
|
||||||
SVCreateTbReq pCreateReq = {0};
|
|
||||||
|
|
||||||
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
|
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
|
||||||
|
memset(&pCreateReq, 0, sizeof(SVCreateTbReq));
|
||||||
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
|
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
|
||||||
tDecoderClear(&decoderTmp);
|
tDecoderClear(&decoderTmp);
|
||||||
taosMemoryFreeClear(pCreateReq.comment);
|
|
||||||
taosArrayDestroy(pCreateReq.ctb.tagName);
|
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pCreateReq.type == TSDB_CHILD_TABLE);
|
ASSERT(pCreateReq.type == TSDB_CHILD_TABLE);
|
||||||
if (strcmp(tbName, pCreateReq.name) == 0) {
|
if (strcmp(tbName, pCreateReq.name) == 0) {
|
||||||
schemaLen = *lenTmp;
|
|
||||||
schemaData = *dataTmp;
|
|
||||||
strcpy(pName.tname, pCreateReq.ctb.stbName);
|
strcpy(pName.tname, pCreateReq.ctb.stbName);
|
||||||
tDecoderClear(&decoderTmp);
|
tDecoderClear(&decoderTmp);
|
||||||
taosMemoryFreeClear(pCreateReq.comment);
|
|
||||||
taosArrayDestroy(pCreateReq.ctb.tagName);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoderTmp);
|
tDecoderClear(&decoderTmp);
|
||||||
taosMemoryFreeClear(pCreateReq.comment);
|
|
||||||
taosArrayDestroy(pCreateReq.ctb.tagName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
|
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
|
||||||
|
@ -2001,167 +1825,23 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint16_t fLen = 0;
|
SVgroupInfo vg;
|
||||||
int32_t rowSize = 0;
|
code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg);
|
||||||
int16_t nVar = 0;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
|
uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName);
|
||||||
SSchema* schema = &pTableMeta->schema[i];
|
goto end;
|
||||||
fLen += TYPE_BYTES[schema->type];
|
}
|
||||||
rowSize += schema->bytes;
|
|
||||||
if (IS_VAR_DATA_TYPE(schema->type)) {
|
void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId));
|
||||||
nVar++;
|
if (hData == NULL) {
|
||||||
}
|
taosHashPut(pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
|
||||||
}
|
}
|
||||||
fLen -= sizeof(TSKEY);
|
|
||||||
|
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve, &pCreateReq);
|
||||||
int32_t rows = rspObj.resInfo.numOfRows;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
|
uError("WriteRaw:rawBlockBindData failed");
|
||||||
(int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1);
|
|
||||||
|
|
||||||
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
|
||||||
|
|
||||||
SSubmitReq* subReq = NULL;
|
|
||||||
SSubmitBlk* blk = NULL;
|
|
||||||
void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
|
|
||||||
if (hData) {
|
|
||||||
vgData = *(VgData*)hData;
|
|
||||||
|
|
||||||
int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
|
|
||||||
void* tmp = taosMemoryRealloc(vgData.data, totalLen);
|
|
||||||
if (tmp == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
vgData.data = tmp;
|
|
||||||
((VgData*)hData)->data = tmp;
|
|
||||||
subReq = (SSubmitReq*)(vgData.data);
|
|
||||||
blk = POINTER_SHIFT(vgData.data, subReq->length);
|
|
||||||
} else {
|
|
||||||
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
|
|
||||||
void* tmp = taosMemoryCalloc(1, totalLen);
|
|
||||||
if (tmp == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
vgData.data = tmp;
|
|
||||||
taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData));
|
|
||||||
subReq = (SSubmitReq*)(vgData.data);
|
|
||||||
subReq->length = sizeof(SSubmitReq);
|
|
||||||
subReq->numOfBlocks = 0;
|
|
||||||
|
|
||||||
blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq));
|
|
||||||
}
|
|
||||||
|
|
||||||
// pSW->pSchema should be same as pTableMeta->schema
|
|
||||||
// ASSERT(pSW->nCols == pTableMeta->tableInfo.numOfColumns);
|
|
||||||
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
|
|
||||||
uint64_t uid = pTableMeta->uid;
|
|
||||||
int16_t sver = pTableMeta->sversion;
|
|
||||||
|
|
||||||
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
|
|
||||||
if (schemaData) {
|
|
||||||
memcpy(blkSchema, schemaData, schemaLen);
|
|
||||||
}
|
|
||||||
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
|
|
||||||
|
|
||||||
SRowBuilder rb = {0};
|
|
||||||
tdSRowInit(&rb, sver);
|
|
||||||
tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen);
|
|
||||||
int32_t totalLen = 0;
|
|
||||||
|
|
||||||
SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
|
||||||
for (int i = 0; i < pSW->nCols; i++) {
|
|
||||||
SSchema* schema = &pSW->pSchema[i];
|
|
||||||
taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < rows; j++) {
|
|
||||||
tdSRowResetBuf(&rb, rowData);
|
|
||||||
|
|
||||||
doSetOneRowPtr(&rspObj.resInfo);
|
|
||||||
rspObj.resInfo.current += 1;
|
|
||||||
|
|
||||||
int32_t offset = 0;
|
|
||||||
for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) {
|
|
||||||
const SSchema* pColumn = &pTableMeta->schema[k];
|
|
||||||
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
|
|
||||||
if (!index) {
|
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
|
|
||||||
} else {
|
|
||||||
char* colData = rspObj.resInfo.row[*index];
|
|
||||||
if (!colData) {
|
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
|
||||||
} else {
|
|
||||||
if (IS_VAR_DATA_TYPE(pColumn->type)) {
|
|
||||||
colData -= VARSTR_HEADER_SIZE;
|
|
||||||
}
|
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
|
||||||
offset += TYPE_BYTES[pColumn->type];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tdSRowEnd(&rb);
|
|
||||||
int32_t rowLen = TD_ROW_LEN(rowData);
|
|
||||||
rowData = POINTER_SHIFT(rowData, rowLen);
|
|
||||||
totalLen += rowLen;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosHashCleanup(schemaHash);
|
|
||||||
blk->uid = htobe64(uid);
|
|
||||||
blk->suid = htobe64(suid);
|
|
||||||
blk->sversion = htonl(sver);
|
|
||||||
blk->schemaLen = htonl(schemaLen);
|
|
||||||
blk->numOfRows = htonl(rows);
|
|
||||||
blk->dataLen = htonl(totalLen);
|
|
||||||
subReq->length += sizeof(SSubmitBlk) + schemaLen + totalLen;
|
|
||||||
subReq->numOfBlocks++;
|
|
||||||
taosMemoryFreeClear(pTableMeta);
|
|
||||||
rspObj.resInfo.pRspMsg = NULL;
|
|
||||||
doFreeReqResultInfo(&rspObj.resInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
|
|
||||||
if (NULL == pQuery) {
|
|
||||||
uError("create SQuery error");
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
|
||||||
pQuery->haveResultSet = false;
|
|
||||||
pQuery->msgType = TDMT_VND_SUBMIT;
|
|
||||||
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
|
||||||
if (NULL == pQuery->pRoot) {
|
|
||||||
uError("create pQuery->pRoot error");
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
|
||||||
|
|
||||||
int32_t numOfVg = taosHashGetSize(pVgHash);
|
|
||||||
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
|
||||||
|
|
||||||
VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL);
|
|
||||||
while (vData) {
|
|
||||||
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
|
||||||
if (NULL == dst) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
dst->vg = vData->vg;
|
|
||||||
SSubmitReq* subReq = (SSubmitReq*)(vData->data);
|
|
||||||
dst->numOfTables = subReq->numOfBlocks;
|
|
||||||
dst->size = subReq->length;
|
|
||||||
dst->pData = (char*)subReq;
|
|
||||||
vData->data = NULL; // no need free
|
|
||||||
subReq->header.vgId = htonl(dst->vg.vgId);
|
|
||||||
subReq->version = htonl(1);
|
|
||||||
subReq->header.contLen = htonl(subReq->length);
|
|
||||||
subReq->length = htonl(subReq->length);
|
|
||||||
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
|
|
||||||
taosArrayPush(nodeStmt->pDataBlocks, &dst);
|
|
||||||
vData = (VgData*)taosHashIterate(pVgHash, vData);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||||
|
@ -2169,8 +1849,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
||||||
|
|
||||||
end:
|
end:
|
||||||
tDeleteSTaosxRsp(&rspObj.rsp);
|
tDeleteSTaosxRsp(&rspObj.rsp);
|
||||||
rspObj.resInfo.pRspMsg = NULL;
|
|
||||||
doFreeReqResultInfo(&rspObj.resInfo);
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
qDestroyQuery(pQuery);
|
qDestroyQuery(pQuery);
|
||||||
destroyRequest(pRequest);
|
destroyRequest(pRequest);
|
||||||
|
|
|
@ -2091,6 +2091,61 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes,
|
||||||
|
int32_t nRows, char* lengthOrbitmap, char *data) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(type)) { // var-length data type
|
||||||
|
for (int32_t i = 0; i < nRows; ++i) {
|
||||||
|
int32_t offset = *((int32_t*)lengthOrbitmap + i);
|
||||||
|
if (offset == -1) {
|
||||||
|
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
|
||||||
|
if (code) goto _exit;
|
||||||
|
} else {
|
||||||
|
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](
|
||||||
|
pColData, (uint8_t *)varDataVal(data + offset), varDataLen(data + offset));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else { // fixed-length data type
|
||||||
|
bool allValue = true;
|
||||||
|
bool allNull = true;
|
||||||
|
for (int32_t i = 0; i < nRows; ++i) {
|
||||||
|
if(!colDataIsNull_f(lengthOrbitmap, i)){
|
||||||
|
allNull = false;
|
||||||
|
}else{
|
||||||
|
allValue = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (allValue) {
|
||||||
|
// optimize (todo)
|
||||||
|
for (int32_t i = 0; i < nRows; ++i) {
|
||||||
|
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](
|
||||||
|
pColData, (uint8_t *)data + bytes * i, bytes);
|
||||||
|
}
|
||||||
|
} else if (allNull) {
|
||||||
|
// optimize (todo)
|
||||||
|
for (int32_t i = 0; i < nRows; ++i) {
|
||||||
|
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
|
||||||
|
if (code) goto _exit;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (int32_t i = 0; i < nRows; ++i) {
|
||||||
|
if (colDataIsNull_f(lengthOrbitmap, i)) {
|
||||||
|
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
|
||||||
|
if (code) goto _exit;
|
||||||
|
} else {
|
||||||
|
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](
|
||||||
|
pColData, (uint8_t *)data + bytes * i, bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind) {
|
int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "parUtil.h"
|
#include "parUtil.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "tRealloc.h"
|
#include "tRealloc.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
|
|
||||||
typedef struct SBlockKeyTuple {
|
typedef struct SBlockKeyTuple {
|
||||||
TSKEY skey;
|
TSKEY skey;
|
||||||
|
@ -295,7 +296,7 @@ int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, in
|
||||||
}
|
}
|
||||||
|
|
||||||
if (*dataBlocks == NULL) {
|
if (*dataBlocks == NULL) {
|
||||||
int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks);
|
int32_t ret = createTableDataBlock((size_t)size, rowSize, startOffset, pTableMeta, dataBlocks);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -1371,3 +1372,77 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, SRetrieveTableRsp* pRsp, SVCreateTbReq *pCreateTb){
|
||||||
|
STableDataCxt* pTableCxt = NULL;
|
||||||
|
int ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid),
|
||||||
|
pTableMeta, &pCreateTb, &pTableCxt, true);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("insGetTableDataCxt error");
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
// no need to bind, because select * get all fields
|
||||||
|
ret = initTableColSubmitData(pTableCxt);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
uError( "initTableColSubmitData error");
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* p = (char*)pRsp->data;
|
||||||
|
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
|
||||||
|
p += sizeof(int32_t);
|
||||||
|
p += sizeof(int32_t);
|
||||||
|
|
||||||
|
int32_t numOfRows = *(int32_t*)p;
|
||||||
|
p += sizeof(int32_t);
|
||||||
|
|
||||||
|
int32_t numOfCols = *(int32_t*)p;
|
||||||
|
p += sizeof(int32_t);
|
||||||
|
|
||||||
|
p += sizeof(int32_t);
|
||||||
|
p += sizeof(uint64_t);
|
||||||
|
|
||||||
|
int8_t *fields = p;
|
||||||
|
p += numOfCols * (sizeof(int8_t) + sizeof(int32_t));
|
||||||
|
|
||||||
|
int32_t* colLength = (int32_t*)p;
|
||||||
|
p += sizeof(int32_t) * numOfCols;
|
||||||
|
|
||||||
|
char* pStart = p;
|
||||||
|
|
||||||
|
SSchema* pSchema = getTableColumnSchema(pTableCxt->pMeta);
|
||||||
|
SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
|
||||||
|
|
||||||
|
if(boundInfo->numOfBound != numOfCols){
|
||||||
|
uError("boundInfo->numOfBound:%d != numOfCols:%d", boundInfo->numOfBound, numOfCols);
|
||||||
|
ret = TSDB_CODE_INVALID_PARA;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
for (int c = 0; c < boundInfo->numOfBound; ++c) {
|
||||||
|
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
|
||||||
|
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
|
||||||
|
|
||||||
|
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
|
||||||
|
uError( "type or bytes not equal");
|
||||||
|
ret = TSDB_CODE_INVALID_PARA;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
colLength[c] = htonl(colLength[c]);
|
||||||
|
int8_t* offset = pStart;
|
||||||
|
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
|
||||||
|
pStart += numOfRows * sizeof(int32_t);
|
||||||
|
} else {
|
||||||
|
pStart += BitmapLen(numOfRows);
|
||||||
|
}
|
||||||
|
char *pData = pStart;
|
||||||
|
|
||||||
|
tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
|
||||||
|
fields += sizeof(int8_t) + sizeof(int32_t);
|
||||||
|
pStart += colLength[c];
|
||||||
|
}
|
||||||
|
|
||||||
|
end:
|
||||||
|
return ret;
|
||||||
|
}
|
Loading…
Reference in New Issue