enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info
This commit is contained in:
parent
3763a6adae
commit
67fe609129
|
@ -176,8 +176,8 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS
|
|||
STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl,
|
||||
char* msgBuf, int32_t msgBufLen);
|
||||
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
|
||||
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* fields,
|
||||
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen);
|
||||
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* fields,
|
||||
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw);
|
||||
|
||||
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
|
||||
int32_t serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap, SArray** pOut);
|
||||
|
|
|
@ -1618,7 +1618,7 @@ int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pDat
|
|||
RAW_NULL_CHECK(pVgHash);
|
||||
RAW_RETURN_CHECK(
|
||||
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
|
||||
RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0));
|
||||
RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
|
||||
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
|
||||
|
||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||
|
@ -1678,7 +1678,7 @@ int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const cha
|
|||
RAW_NULL_CHECK(pVgHash);
|
||||
RAW_RETURN_CHECK(
|
||||
taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
|
||||
RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0));
|
||||
RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
|
||||
RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
|
||||
|
||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||
|
@ -1960,35 +1960,32 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
|
|||
vgId = vg->vgId;
|
||||
}
|
||||
|
||||
STableMeta** pTableMeta = (STableMeta**)taosHashGet(pMetaHash, tbName, strlen(tbName));
|
||||
if (pTableMeta == NULL) {
|
||||
STableMeta* pTableMeta = NULL;
|
||||
STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, tbName, strlen(tbName));
|
||||
if (pTableMetaTmp == NULL) {
|
||||
if (pCreateReqDst) { // change stable name to get meta
|
||||
(void)strcpy(pName.tname, pCreateReqDst->ctb.stbName);
|
||||
}
|
||||
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, pTableMeta));
|
||||
RAW_RETURN_CHECK(taosHashPut(pMetaHash, tbName, strlen(tbName), pTableMeta, POINTER_BYTES));
|
||||
if (pCreateReqDst) {
|
||||
(*pTableMeta)->vgId = vgId;
|
||||
(*pTableMeta)->uid = pCreateReqDst->uid;
|
||||
pCreateReqDst->ctb.suid = (*pTableMeta)->suid;
|
||||
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
|
||||
code = taosHashPut(pMetaHash, tbName, strlen(tbName), &pTableMeta, POINTER_BYTES);
|
||||
if (code != 0){
|
||||
taosMemoryFree(pTableMeta);
|
||||
goto end;
|
||||
}
|
||||
if (pCreateReqDst) {
|
||||
pTableMeta->vgId = vgId;
|
||||
pTableMeta->uid = pCreateReqDst->uid;
|
||||
pCreateReqDst->ctb.suid = pTableMeta->suid;
|
||||
}
|
||||
}else{
|
||||
pTableMeta = *pTableMetaTmp;
|
||||
}
|
||||
|
||||
// SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
|
||||
// RAW_NULL_CHECK(pSW);
|
||||
// TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
|
||||
// RAW_NULL_CHECK(fields);
|
||||
//
|
||||
// for (int i = 0; i < pSW->nCols; i++) {
|
||||
// fields[i].type = pSW->pSchema[i].type;
|
||||
// fields[i].bytes = pSW->pSchema[i].bytes;
|
||||
// tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
|
||||
// }
|
||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
|
||||
RAW_NULL_CHECK(pSW);
|
||||
void* rawData = getRawDataFromRes(pRetrieve);
|
||||
char err[ERR_MSG_LEN] = {0};
|
||||
code = rawBlockBindData(pQuery, *pTableMeta, rawData, pCreateReqDst, NULL, 0, true, err, ERR_MSG_LEN);
|
||||
// code = rawBlockBindData(pQuery, *pTableMeta, rawData, pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN);
|
||||
// taosMemoryFree(fields);
|
||||
code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
|
||||
goto end;
|
||||
|
|
|
@ -886,8 +886,8 @@ static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
|
|||
return false;
|
||||
}
|
||||
|
||||
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields,
|
||||
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen) {
|
||||
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* tFields,
|
||||
int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw) {
|
||||
int ret = 0;
|
||||
if(data == NULL) {
|
||||
uError("rawBlockBindData, data is NULL");
|
||||
|
@ -964,11 +964,16 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
|||
goto end;
|
||||
}
|
||||
if (tFields != NULL && numFields > boundInfo->numOfBound) {
|
||||
if (errstr != NULL)
|
||||
snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound);
|
||||
if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound);
|
||||
ret = TSDB_CODE_INVALID_PARA;
|
||||
goto end;
|
||||
}
|
||||
if (tFields == NULL && numOfCols != boundInfo->numOfBound) {
|
||||
if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to num of bound cols:%d", numOfCols, boundInfo->numOfBound);
|
||||
ret = TSDB_CODE_INVALID_PARA;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (tFields == NULL) {
|
||||
for (int j = 0; j < boundInfo->numOfBound; j++) {
|
||||
SSchema* pColSchema = &pSchema[j];
|
||||
|
@ -1006,7 +1011,13 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
|
|||
for (int i = 0; i < numFields; i++) {
|
||||
for (int j = 0; j < boundInfo->numOfBound; j++) {
|
||||
SSchema* pColSchema = &pSchema[j];
|
||||
if (strcmp(pColSchema->name, tFields[i].name) == 0) {
|
||||
char* fieldName = NULL;
|
||||
if (raw) {
|
||||
fieldName = ((SSchemaWrapper*)tFields)->pSchema[i].name;
|
||||
} else {
|
||||
fieldName = ((TAOS_FIELD*)tFields)[i].name;
|
||||
}
|
||||
if (strcmp(pColSchema->name, fieldName) == 0) {
|
||||
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
|
||||
if (errstr != NULL)
|
||||
snprintf(errstr, errstrLen,
|
||||
|
|
|
@ -79,6 +79,7 @@ static void msg_process(TAOS_RES* msg) {
|
|||
} else {
|
||||
taosFprintfFile(g_fp, result);
|
||||
taosFprintfFile(g_fp, "\n");
|
||||
taosFsyncFile(g_fp);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue