[td-32] fix bugs in inserting data
This commit is contained in:
parent
699c1bbd7d
commit
50705f3d11
|
@ -62,7 +62,7 @@ typedef struct STableMeta {
|
|||
int8_t numOfVpeers;
|
||||
int16_t sversion;
|
||||
SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT];
|
||||
int32_t vgid; // virtual group id, which current table belongs to
|
||||
int32_t vgId; // virtual group id, which current table belongs to
|
||||
int32_t sid; // the index of one table in a virtual node
|
||||
uint64_t uid; // unique id of a table
|
||||
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
|
||||
|
@ -182,7 +182,7 @@ typedef struct STableDataBlocks {
|
|||
char tableId[TSDB_TABLE_ID_LEN];
|
||||
int8_t tsSource; // where does the UNIX timestamp come from, server or client
|
||||
bool ordered; // if current rows are ordered or not
|
||||
int64_t vgid; // virtual group id
|
||||
int64_t vgId; // virtual group id
|
||||
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
|
||||
int32_t numOfTables; // number of tables in current submit block
|
||||
|
||||
|
|
|
@ -698,7 +698,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
|
|||
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(dataBuf->pData);
|
||||
tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
|
||||
|
||||
dataBuf->vgid = pTableMeta->vgid;
|
||||
dataBuf->vgId = pTableMeta->vgId;
|
||||
dataBuf->numOfTables = 1;
|
||||
|
||||
/*
|
||||
|
@ -1058,7 +1058,6 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
|
|||
goto _error_clean;
|
||||
}
|
||||
|
||||
void *fp = pSql->fp;
|
||||
ptrdiff_t pos = pSql->asyncTblPos - pSql->sqlstr;
|
||||
|
||||
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1068,17 +1067,15 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
|
|||
* And during the getMeterMetaCallback function, the sql string will be parsed from the
|
||||
* interrupted position.
|
||||
*/
|
||||
if (fp != NULL) {
|
||||
if (TSDB_CODE_ACTION_IN_PROGRESS == code) {
|
||||
tscTrace("async insert and waiting to get meter meta, then continue parse sql from offset: %" PRId64, pos);
|
||||
return code;
|
||||
}
|
||||
|
||||
// todo add to return
|
||||
tscError("async insert parse error, code:%d, %s", code, tstrerror(code));
|
||||
pSql->asyncTblPos = NULL;
|
||||
if (TSDB_CODE_ACTION_IN_PROGRESS == code) {
|
||||
tscTrace("async insert and waiting to get meter meta, then continue parse sql from offset: %" PRId64, pos);
|
||||
return code;
|
||||
}
|
||||
|
||||
// todo add to return
|
||||
tscError("async insert parse error, code:%d, %s", code, tstrerror(code));
|
||||
pSql->asyncTblPos = NULL;
|
||||
|
||||
goto _error_clean; // TODO: should _clean or _error_clean to async flow ????
|
||||
}
|
||||
|
||||
|
@ -1096,15 +1093,13 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
|
|||
goto _error_clean;
|
||||
}
|
||||
|
||||
int32_t numOfCols = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
|
||||
if (sToken.type == TK_VALUES) {
|
||||
SParsedDataColInfo spd = {.numOfCols = numOfCols};
|
||||
SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
|
||||
|
||||
SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
|
||||
|
||||
tscSetAssignedColumnInfo(&spd, pSchema, numOfCols);
|
||||
tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
|
||||
|
||||
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
|
||||
goto _error_clean;
|
||||
|
@ -1243,7 +1238,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
|
|||
|
||||
// submit to more than one vnode
|
||||
if (pCmd->pDataBlocks->nSize > 0) {
|
||||
// merge according to vgid
|
||||
// merge according to vgId
|
||||
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
|
||||
goto _error_clean;
|
||||
}
|
||||
|
|
|
@ -165,7 +165,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
|
|||
|
||||
pTableMeta->sid = pTableMetaMsg->sid;
|
||||
pTableMeta->uid = pTableMetaMsg->uid;
|
||||
pTableMeta->vgid = pTableMetaMsg->vgid;
|
||||
pTableMeta->vgId = pTableMetaMsg->vgId;
|
||||
|
||||
pTableMeta->numOfVpeers = pTableMetaMsg->numOfVpeers;
|
||||
memcpy(pTableMeta->vpeerDesc, pTableMetaMsg->vpeerDesc, sizeof(SVnodeDesc) * pTableMeta->numOfVpeers);
|
||||
|
|
|
@ -341,11 +341,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
|||
* the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately.
|
||||
*/
|
||||
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
|
||||
if (command == TSDB_SQL_INSERT) { // handle multi-vnode insertion situation
|
||||
(*pSql->fp)(pSql, taosres, rpcMsg->code);
|
||||
} else {
|
||||
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
|
||||
}
|
||||
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
|
||||
|
||||
if (shouldFree) {
|
||||
// If it is failed, all objects allocated during execution taos_connect_a should be released
|
||||
|
@ -539,6 +535,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
char * pMsg, *pStart;
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
|
||||
|
||||
pStart = pSql->cmd.payload + tsRpcHeadSize;
|
||||
pMsg = pStart;
|
||||
|
@ -546,15 +543,17 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pShellMsg = (SShellSubmitMsg *)pMsg;
|
||||
|
||||
pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1);
|
||||
pShellMsg->vnode = 0; //htons(pTableMeta->vpeerDesc[pTableMeta->index].vnode);
|
||||
pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
|
||||
pShellMsg->header.vgId = htonl(pTableMeta->vgId);
|
||||
|
||||
pShellMsg->header.contLen = pSql->cmd.payloadLen;
|
||||
pShellMsg->numOfTables = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
|
||||
|
||||
// pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
|
||||
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
||||
// tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pTableMeta->index].ip),
|
||||
// htons(pShellMsg->vnode));
|
||||
|
||||
pSql->cmd.payloadLen = sizeof(SShellSubmitMsg);
|
||||
// pSql->cmd.payloadLen = sizeof(SShellSubmitMsg);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -676,7 +675,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pQueryMsg->uid = pTableMeta->uid;
|
||||
pQueryMsg->numOfTagsCols = 0;
|
||||
|
||||
pQueryMsg->vgId = htonl(pTableMeta->vgid);
|
||||
pQueryMsg->vgId = htonl(pTableMeta->vgId);
|
||||
tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name);
|
||||
} else { // query on super table
|
||||
if (pTableMetaInfo->vnodeIndex < 0) {
|
||||
|
@ -1849,12 +1848,12 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
|||
|
||||
pMetaMsg->sid = htonl(pMetaMsg->sid);
|
||||
pMetaMsg->sversion = htons(pMetaMsg->sversion);
|
||||
pMetaMsg->vgid = htonl(pMetaMsg->vgid);
|
||||
pMetaMsg->vgId = htonl(pMetaMsg->vgId);
|
||||
pMetaMsg->uid = htobe64(pMetaMsg->uid);
|
||||
pMetaMsg->contLen = htons(pMetaMsg->contLen);
|
||||
|
||||
if (pMetaMsg->sid < 0 || pMetaMsg->vgid < 0) {
|
||||
tscError("invalid meter vgid:%d, sid%d", pMetaMsg->vgid, pMetaMsg->sid);
|
||||
if (pMetaMsg->sid < 0 || pMetaMsg->vgId < 0) {
|
||||
tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgId, pMetaMsg->sid);
|
||||
return TSDB_CODE_INVALID_VALUE;
|
||||
}
|
||||
|
||||
|
@ -1948,11 +1947,11 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
|
|||
|
||||
pMeta->sid = htonl(pMeta->sid);
|
||||
pMeta->sversion = htons(pMeta->sversion);
|
||||
pMeta->vgid = htonl(pMeta->vgid);
|
||||
pMeta->vgId = htonl(pMeta->vgId);
|
||||
pMeta->uid = htobe64(pMeta->uid);
|
||||
|
||||
if (pMeta->sid <= 0 || pMeta->vgid < 0) {
|
||||
tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid);
|
||||
if (pMeta->sid <= 0 || pMeta->vgId < 0) {
|
||||
tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
|
||||
pSql->res.code = TSDB_CODE_INVALID_VALUE;
|
||||
pSql->res.numOfTotal = i;
|
||||
return TSDB_CODE_OTHERS;
|
||||
|
|
|
@ -130,7 +130,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
|||
pSql->pTscObj = pObj;
|
||||
pSql->signature = pSql;
|
||||
tsem_init(&pSql->rspSem, 0, 0);
|
||||
// tsem_init(&pSql->emptyRspSem, 0, 1);
|
||||
|
||||
pObj->pSql = pSql;
|
||||
pSql->fp = fp;
|
||||
pSql->param = param;
|
||||
|
@ -146,6 +146,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid
|
||||
tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg);
|
||||
return pObj;
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,6 @@ void * pTscMgmtConn;
|
|||
void * pSlaveConn;
|
||||
void * tscCacheHandle;
|
||||
int32_t globalCode = 0;
|
||||
int initialized = 0;
|
||||
int slaveIndex;
|
||||
void * tscTmr;
|
||||
void * tscQhandle;
|
||||
|
@ -187,9 +186,7 @@ void taos_init_imp() {
|
|||
|
||||
if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime);
|
||||
|
||||
initialized = 1;
|
||||
tscTrace("client is initialized successfully");
|
||||
tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg);
|
||||
}
|
||||
|
||||
void taos_init() { pthread_once(&tscinit, taos_init_imp); }
|
||||
|
|
|
@ -614,7 +614,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
|
|||
*/
|
||||
pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize;
|
||||
|
||||
assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100);
|
||||
assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100 && pCmd->payloadLen > 0);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -705,8 +705,9 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
|
|||
STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i];
|
||||
|
||||
STableDataBlocks* dataBuf = NULL;
|
||||
int32_t ret =
|
||||
tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid, TSDB_PAYLOAD_SIZE,
|
||||
|
||||
int32_t ret =
|
||||
tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
|
||||
tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
|
||||
|
|
|
@ -198,10 +198,20 @@ typedef struct {
|
|||
} SShellSubmitBlock;
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfVnodes;
|
||||
} SMsgDesc;
|
||||
|
||||
typedef struct SMsgHead {
|
||||
int32_t contLen;
|
||||
int32_t vgId;
|
||||
} SMsgHead;
|
||||
|
||||
typedef struct {
|
||||
SMsgDesc desc;
|
||||
SMsgHead header;
|
||||
int16_t import;
|
||||
int16_t vnode;
|
||||
int32_t numOfSid; /* total number of sid */
|
||||
char blks[]; /* numOfSid blocks, each blocks for one table */
|
||||
int32_t numOfTables; // total number of sid
|
||||
char blks[]; // number of data blocks, each table has at least one data block
|
||||
} SShellSubmitMsg;
|
||||
|
||||
typedef struct {
|
||||
|
@ -232,15 +242,6 @@ typedef struct {
|
|||
uint32_t ip;
|
||||
} SVnodeDesc;
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfVnodes;
|
||||
} SMsgDesc;
|
||||
|
||||
typedef struct {
|
||||
int32_t contLen;
|
||||
int32_t vgId;
|
||||
} SMsgHead;
|
||||
|
||||
typedef struct {
|
||||
int32_t contLen;
|
||||
int32_t vgId;
|
||||
|
@ -688,7 +689,7 @@ typedef struct STableMetaMsg {
|
|||
int8_t numOfVpeers;
|
||||
SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT];
|
||||
int32_t sid;
|
||||
int32_t vgid;
|
||||
int32_t vgId;
|
||||
uint64_t uid;
|
||||
SSchema schema[];
|
||||
} STableMetaMsg;
|
||||
|
|
|
@ -445,7 +445,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName
|
|||
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) {
|
||||
pMeta->uid = htobe64(pTable->uid);
|
||||
pMeta->sid = htonl(pTable->sid);
|
||||
pMeta->vgid = htonl(pTable->vgId);
|
||||
pMeta->vgId = htonl(pTable->vgId);
|
||||
pMeta->sversion = htons(pTable->superTable->sversion);
|
||||
pMeta->precision = pDb->cfg.precision;
|
||||
pMeta->numOfTags = pTable->superTable->numOfTags;
|
||||
|
|
|
@ -524,7 +524,7 @@ static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SNormalTableObj *p
|
|||
int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) {
|
||||
pMeta->uid = htobe64(pTable->uid);
|
||||
pMeta->sid = htonl(pTable->sid);
|
||||
pMeta->vgid = htonl(pTable->vgId);
|
||||
pMeta->vgId = htonl(pTable->vgId);
|
||||
pMeta->sversion = htons(pTable->sversion);
|
||||
pMeta->precision = pDb->cfg.precision;
|
||||
pMeta->numOfTags = 0;
|
||||
|
|
|
@ -654,7 +654,7 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) {
|
|||
int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) {
|
||||
pMeta->uid = htobe64(pTable->uid);
|
||||
pMeta->sid = htonl(pTable->sid);
|
||||
pMeta->vgid = htonl(pTable->vgId);
|
||||
pMeta->vgId = htonl(pTable->vgId);
|
||||
pMeta->sversion = htons(pTable->sversion);
|
||||
pMeta->precision = pDb->cfg.precision;
|
||||
pMeta->numOfTags = pTable->numOfTags;
|
||||
|
|
Loading…
Reference in New Issue