Merge pull request #19785 from taosdata/fix/TD-21899

fix:auto create table & NULL NONE problem for taosx
This commit is contained in:
Haojun Liao 2023-02-09 15:31:13 +08:00 committed by GitHub
commit 6cf363600b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 410 additions and 94 deletions

View File

@ -116,9 +116,7 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS
STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
char* msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* fields,
int numFields);
int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD *fields, int numFields, bool needChangeLength);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);

View File

@ -1126,7 +1126,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
TAOS_RES* res = taos_query(taos, sql);
SRequestObj* pRequest = (SRequestObj*)res;
code = pRequest->code;
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
code = TSDB_CODE_SUCCESS;
}
taos_free_result(res);
@ -1307,7 +1307,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields);
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed");
goto end;
@ -1387,7 +1387,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData));
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0);
code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed");
goto end;
@ -1505,7 +1505,18 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
}
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, NULL, 0);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if(fields == NULL){
goto end;
}
for(int i = 0; i < pSW->nCols; i++){
fields[i].type = pSW->pSchema[i].type;
fields[i].bytes = pSW->pSchema[i].bytes;
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
}
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, fields, pSW->nCols, true);
taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed");
goto end;
@ -1515,7 +1526,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
code = smlBuildOutput(pQuery, pVgHash);
if (code != TSDB_CODE_SUCCESS) {
uError("smlBuildOutput failed");
return code;
goto end;
}
launchQueryImpl(pRequest, pQuery, true, NULL);
@ -1538,6 +1549,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
SMqTaosxRspObj rspObj = {0};
SDecoder decoder = {0};
STableMeta* pTableMeta = NULL;
SVCreateTbReq* pCreateReqDst = NULL;
terrno = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0);
@ -1605,17 +1617,17 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
strcpy(pName.tname, tbName);
// find schema data info
SVCreateTbReq pCreateReq = {0};
for (int j = 0; j < rspObj.rsp.createTableNum; j++) {
void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j);
int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j);
SDecoder decoderTmp = {0};
SVCreateTbReq pCreateReq = {0};
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
memset(&pCreateReq, 0, sizeof(SVCreateTbReq));
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
tDecoderClear(&decoderTmp);
uError("WriteRaw: tDecodeSVCreateTbReq error");
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
}
@ -1625,13 +1637,18 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end;
}
if (strcmp(tbName, pCreateReq.name) == 0) {
strcpy(pName.tname, pCreateReq.ctb.stbName);
cloneSVreateTbReq(&pCreateReq, &pCreateReqDst);
tDecoderClear(&decoderTmp);
break;
}
tDecoderClear(&decoderTmp);
}
if(pCreateReqDst){
strcpy(pName.tname, pCreateReqDst->ctb.stbName);
}else{
strcpy(pName.tname, tbName);
}
code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName);
@ -1650,16 +1667,38 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end;
}
if(pCreateReqDst){
pTableMeta->vgId = vg.vgId;
pTableMeta->uid = pCreateReqDst->uid;
}
void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId));
if (hData == NULL) {
taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
}
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, &pCreateReq, NULL, 0);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if(fields == NULL){
goto end;
}
for(int i = 0; i < pSW->nCols; i++){
fields[i].type = pSW->pSchema[i].type;
fields[i].bytes = pSW->pSchema[i].bytes;
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
}
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, fields, pSW->nCols, true);
taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed");
goto end;
}
pCreateReqDst = NULL;
}
code = smlBuildOutput(pQuery, pVgHash);
if (code != TSDB_CODE_SUCCESS) {
uError("smlBuildOutput failed");
goto end;
}
launchQueryImpl(pRequest, pQuery, true, NULL);
@ -1672,6 +1711,10 @@ end:
destroyRequest(pRequest);
taosHashCleanup(pVgHash);
taosMemoryFreeClear(pTableMeta);
if (pCreateReqDst) {
tdDestroySVCreateTbReq(pCreateReqDst);
taosMemoryFree(pCreateReqDst);
}
return code;
}

View File

@ -2484,6 +2484,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
data += colSizes[col];
colSizes[col] = htonl(colSizes[col]);
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, htonl(colSizes[col]), colSizes[col]);
}
*actualLen = dataLen;

View File

@ -973,9 +973,8 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
code = 0;
goto _OVER;
} else if (pStb->uid != createReq.suid) {
mError("stb:%s, already exist while create, input suid:%" PRId64 " not match with exist suid:%" PRId64,
createReq.name, createReq.suid, pStb->uid);
terrno = TSDB_CODE_MND_STABLE_UID_NOT_MATCH;
mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
code = 0;
goto _OVER;
} else if (createReq.tagVer > 0 || createReq.colVer > 0) {
int32_t tagDelta = createReq.tagVer - pStb->tagVer;

View File

@ -270,8 +270,8 @@ int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen,
// int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock2(STqReader *pReader);
bool tqNextDataBlockFilterOut2(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader);
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas);
int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData** pSubmitTbDataRet);
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData** pSubmitTbDataRet);
// int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
// int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas);

View File

@ -688,6 +688,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
.ver = pHead->version,
};
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId,
TD_VID(pTq->pVnode), req.subKey);
return -1;
}
if (taosxRsp.blockNum > 0 /* threshold */) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
@ -704,7 +707,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} else {
/*A(pHandle->fetchMeta);*/
/*A(IS_META_MSG(pHead->msgType));*/
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;

View File

@ -34,7 +34,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
return 0;
}
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) {
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper);
if (pSW == NULL) {
return -1;
@ -43,7 +43,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRs
return 0;
}
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success
@ -153,7 +153,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
if (pRsp->withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, 1) < 0) {
if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) {
continue;
}
} else {
@ -163,7 +163,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
}
if (pRsp->withSchema) {
if (pOffset->type == TMQ_OFFSET__LOG) {
tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);
tqAddBlockSchemaToRsp(pExec, pRsp);
} else {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
taosArrayPush(pRsp->blockSchema, &pSW);
@ -241,13 +241,14 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas) < 0) {
SSubmitTbData* pSubmitTbDataRet = NULL;
if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
}
if (pRsp->withTbName) {
/*int64_t uid = pExec->pExecReader->msgIter.uid;*/
int64_t uid = pExec->pExecReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
@ -255,23 +256,33 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
continue;
}
}
if (pHandle->fetchMeta) {
#if 0
SSubmitBlk* pBlk = pReader->pBlock;
int64_t uid = pExec->pExecReader->lastBlkUid;
int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) {
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
}
void* createReq = taosMemoryCalloc(1, schemaLen);
memcpy(createReq, pBlk->data, schemaLen);
taosArrayPush(pRsp->createTableLen, &schemaLen);
taosArrayPush(pRsp->createTableReq, &createReq);
pRsp->createTableNum++;
if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
}
#endif
int32_t code = TSDB_CODE_SUCCESS;
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) {
continue;
}
void* createReq = taosMemoryCalloc(1, len);
SEncoder encoder = {0};
tEncoderInit(&encoder, createReq, len);
code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
if (code < 0) {
tEncoderClear(&encoder);
taosMemoryFree(createReq);
continue;
}
taosArrayPush(pRsp->createTableLen, &len);
taosArrayPush(pRsp->createTableReq, &createReq);
pRsp->createTableNum++;
tEncoderClear(&encoder);
}
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
@ -294,12 +305,13 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
/*}*/
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas) < 0) {
SSubmitTbData* pSubmitTbDataRet = NULL;
if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
}
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) {
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
@ -307,22 +319,33 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
continue;
}
}
if (pHandle->fetchMeta) {
#if 0
SSubmitBlk* pBlk = pReader->pBlock;
int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) {
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
}
void* createReq = taosMemoryCalloc(1, schemaLen);
memcpy(createReq, pBlk->data, schemaLen);
taosArrayPush(pRsp->createTableLen, &schemaLen);
taosArrayPush(pRsp->createTableReq, &createReq);
pRsp->createTableNum++;
if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
}
#endif
int32_t code = TSDB_CODE_SUCCESS;
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) {
continue;
}
void* createReq = taosMemoryCalloc(1, len);
SEncoder encoder = {0};
tEncoderInit(&encoder, createReq, len);
code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
if (code < 0) {
tEncoderClear(&encoder);
taosMemoryFree(createReq);
continue;
}
taosArrayPush(pRsp->createTableLen, &len);
taosArrayPush(pRsp->createTableReq, &createReq);
pRsp->createTableNum++;
tEncoderClear(&encoder);
}
/*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/
/*pTq->pVnode->config.tsdbCfg.precision);*/
@ -340,13 +363,11 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
}
}
}
taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas);
if (pRsp->blockNum == 0) {
return -1;
}
// if (pRsp->blockNum == 0) {
// return -1;
// }
return 0;
}

View File

@ -332,7 +332,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
while (tqNextDataBlock2(pReader)) {
// TODO mem free
memset(&ret->data, 0, sizeof(SSDataBlock));
int32_t code = tqRetrieveDataBlock2(&ret->data, pReader);
int32_t code = tqRetrieveDataBlock2(&ret->data, pReader, NULL);
if (code != 0 || ret->data.info.rows == 0) {
continue;
}
@ -454,12 +454,13 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) {
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < blockSz) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) return true;
if (filterOutUids == NULL) return true;
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
if (ret == NULL) {
return true;
}
pReader->nextBlk++;
}
tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE);
@ -550,7 +551,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader)
}
#endif
int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
ASSERT(pReader->nextBlk < blockSz);
@ -559,6 +560,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++;
if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData;
int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid;
@ -670,7 +672,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
sourceIdx++;
} else if (pCol->cid == pColData->info.colId) {
for (int32_t i = 0; i < pCol->nVal; i++) {
tColDataGetValue(pCol, sourceIdx, &colVal);
tColDataGetValue(pCol, i, &colVal);
#if 0
void* val = NULL;
if (IS_STR_DATA_TYPE(colVal.type)) {
@ -710,7 +712,6 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
for (int32_t i = 0; i < numOfRows; i++) {
SRow* pRow = taosArrayGetP(pRows, i);
int32_t targetIdx = 0;
int32_t sourceIdx = 0;
for (int32_t j = 0; j < colActual; j++) {
@ -743,7 +744,6 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
}
sourceIdx++;
targetIdx++;
break;
} else {
ASSERT(0);
@ -1006,14 +1006,258 @@ FAIL:
}
#endif
int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock2(&block, pReader) == 0) {
taosArrayPush(blocks, &block);
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pReader->pSchemaWrapper);
taosArrayPush(schemas, &pSW);
return 0;
int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++;
if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData;
int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid;
pReader->lastBlkUid = uid;
taosMemoryFree(pReader->pSchema);
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchema == NULL) {
tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64
"), version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion);
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
STSchema* pTschema = pReader->pSchema;
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
int32_t numOfRows = 0;
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
SArray* pCols = pSubmitTbData->aCol;
SColData* pCol = taosArrayGet(pCols, 0);
numOfRows = pCol->nVal;
} else {
SArray* pRows = pSubmitTbData->aRowP;
numOfRows = taosArrayGetSize(pRows);
}
int32_t curRow = 0;
int32_t lastRow = 0;
char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
if (assigned == NULL) return -1;
// convert and scan one block
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
SArray* pCols = pSubmitTbData->aCol;
int32_t numOfCols = taosArrayGetSize(pCols);
for (int32_t i = 0; i < numOfRows; i++) {
bool buildNew = false;
for (int32_t j = 0; j < numOfCols; j++){
SColData* pCol = taosArrayGet(pCols, j);
SColVal colVal;
tColDataGetValue(pCol, i, &colVal);
if (curRow == 0) {
assigned[j] = !COL_VAL_IS_NONE(&colVal);
buildNew = true;
} else {
bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal);
if (currentRowAssigned != assigned[j]) {
assigned[j] = currentRowAssigned;
buildNew = true;
}
}
}
if (buildNew) {
if (taosArrayGetSize(blocks) > 0) {
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
lastRow = curRow;
}
SSDataBlock block = {0};
SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if(pSW == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) {
blockDataFreeRes(&block);
tDeleteSSchemaWrapper(pSW);
goto FAIL;
}
tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(block.pDataBlock));
block.info.id.uid = uid;
block.info.version = pReader->msg2.ver;
if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
blockDataFreeRes(&block);
tDeleteSSchemaWrapper(pSW);
goto FAIL;
}
taosArrayPush(blocks, &block);
taosArrayPush(schemas, &pSW);
}
SSDataBlock* pBlock = taosArrayGetLast(blocks);
tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(blocks));
int32_t targetIdx = 0;
int32_t sourceIdx = 0;
int32_t colActual = blockDataGetNumOfCols(pBlock);
while (targetIdx < colActual) {
SColData* pCol = taosArrayGet(pCols, sourceIdx);
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
SColVal colVal;
if (pCol->cid < pColData->info.colId) {
sourceIdx++;
} else if (pCol->cid == pColData->info.colId) {
tColDataGetValue(pCol, i, &colVal);
if (IS_STR_DATA_TYPE(colVal.type)) {
if (colVal.value.pData != NULL) {
char val[65535 + 2];
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
} else {
colDataAppendNULL(pColData, curRow - lastRow);
}
} else {
if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
}
sourceIdx++;
targetIdx++;
}
}
curRow++;
}
} else {
SArray* pRows = pSubmitTbData->aRowP;
for (int32_t i = 0; i < numOfRows; i++) {
SRow* pRow = taosArrayGetP(pRows, i);
bool buildNew = false;
for (int32_t j = 0; j < pTschema->numOfCols; j++){
SColVal colVal;
tRowGet(pRow, pTschema, j, &colVal);
if (curRow == 0) {
assigned[j] = !COL_VAL_IS_NONE(&colVal);
buildNew = true;
} else {
bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal);
if (currentRowAssigned != assigned[j]) {
assigned[j] = currentRowAssigned;
buildNew = true;
}
}
}
if (buildNew) {
if (taosArrayGetSize(blocks) > 0) {
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
lastRow = curRow;
}
SSDataBlock block = {0};
SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if(pSW == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) {
blockDataFreeRes(&block);
tDeleteSSchemaWrapper(pSW);
goto FAIL;
}
tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(block.pDataBlock));
block.info.id.uid = uid;
block.info.version = pReader->msg2.ver;
if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
blockDataFreeRes(&block);
tDeleteSSchemaWrapper(pSW);
goto FAIL;
}
taosArrayPush(blocks, &block);
taosArrayPush(schemas, &pSW);
}
SSDataBlock* pBlock = taosArrayGetLast(blocks);
tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(blocks));
int32_t targetIdx = 0;
int32_t sourceIdx = 0;
int32_t colActual = blockDataGetNumOfCols(pBlock);
while (targetIdx < colActual) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
SColVal colVal;
tRowGet(pRow, pTschema, sourceIdx, &colVal);
if (colVal.cid < pColData->info.colId) {
sourceIdx++;
} else if (colVal.cid == pColData->info.colId) {
if (IS_STR_DATA_TYPE(colVal.type)) {
if (colVal.value.pData != NULL) {
char val[65535 + 2];
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
} else {
colDataAppendNULL(pColData, curRow - lastRow);
}
} else {
if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
}
sourceIdx++;
targetIdx++;
}
}
curRow++;
}
}
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
taosMemoryFree(assigned);
return 0;
FAIL:
taosMemoryFree(assigned);
return -1;
}

View File

@ -1554,7 +1554,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
while (tqNextDataBlock2(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader);
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue;
@ -1963,7 +1963,7 @@ FETCH_NEXT_BLOCK:
while (tqNextDataBlock2(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader);
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue;

View File

@ -609,7 +609,7 @@ static int bindFileds(SBoundColInfo* pBoundInfo, SSchema* pSchema, TAOS_FIELD* f
}
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields,
int numFields) {
int numFields, bool needChangeLength) {
STableDataCxt* pTableCxt = NULL;
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true);
@ -679,10 +679,15 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
pStart += BitmapLen(numOfRows);
}
char* pData = pStart;
// uError("rawBlockBindData col bytes:%d, type:%d, size:%d, htonl size:%d", pColSchema->bytes, pColSchema->type, colLength[c], htonl(colLength[c]));
tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
fields += sizeof(int8_t) + sizeof(int32_t);
pStart += colLength[c];
if(needChangeLength) {
pStart += htonl(colLength[c]);
}else{
pStart += colLength[c];
}
}
end:

View File

@ -43,9 +43,9 @@ class TDTestCase:
tdSql.execute('use db_taosx')
tdSql.query("show tables")
if drop:
tdSql.checkRows(10)
tdSql.checkRows(11)
else:
tdSql.checkRows(15)
tdSql.checkRows(16)
tdSql.query("select * from jt order by i")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1)
@ -63,20 +63,20 @@ class TDTestCase:
tdSql.checkData(1, 5, "sttb4")
tdSql.query("select * from stt order by ts")
tdSql.checkRows(2)
tdSql.checkRows(3)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 21)
tdSql.checkData(2, 1, 21)
tdSql.checkData(0, 2, 2)
tdSql.checkData(1, 2, 21)
tdSql.checkData(2, 2, 21)
tdSql.checkData(0, 5, "stt3")
tdSql.checkData(1, 5, "stt4")
tdSql.checkData(2, 5, "stt4")
tdSql.execute('use abc1')
tdSql.query("show tables")
if drop:
tdSql.checkRows(10)
tdSql.checkRows(11)
else:
tdSql.checkRows(15)
tdSql.checkRows(16)
tdSql.query("select * from jt order by i")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1)
@ -94,13 +94,13 @@ class TDTestCase:
tdSql.checkData(1, 5, "sttb4")
tdSql.query("select * from stt order by ts")
tdSql.checkRows(2)
tdSql.checkRows(3)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 21)
tdSql.checkData(2, 1, 21)
tdSql.checkData(0, 2, 2)
tdSql.checkData(1, 2, 21)
tdSql.checkData(2, 2, 21)
tdSql.checkData(0, 5, "stt3")
tdSql.checkData(1, 5, "stt4")
tdSql.checkData(2, 5, "stt4")
return

View File

@ -71,6 +71,8 @@ static void msg_process(TAOS_RES* msg) {
printf("write raw data type: %d\n", raw.raw_type);
int32_t ret = tmq_write_raw(pConn, raw);
printf("write raw data: %s\n", tmq_err2str(ret));
ASSERT(ret == 0);
tmq_free_raw(raw);
taos_close(pConn);
}
@ -361,7 +363,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes){
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into stt3 using stt tags(23, \"stt3\", true) values(now + 1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') "
pRes = taos_query(pConn, "insert into stt1 values(now + 2s, 3, 2, 'stt1') stt3 using stt tags(23, \"stt3\", true) values(now + 1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') "
"stt4 using stt tags(433, \"stt4\", false) values(now + 3s, 21, 21, 'stt4') sttb4 using sttb tags(543, \"sttb4\", true) values(now + 4s, 16, 25, 'sttb4')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));