commit
9a6dd6d497
|
@ -791,12 +791,12 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
|
||||||
sToken = tStrGetToken(sql, &index, false, 0, NULL);
|
sToken = tStrGetToken(sql, &index, false, 0, NULL);
|
||||||
sql += index;
|
sql += index;
|
||||||
|
|
||||||
STagData *pTag = (STagData *)pCmd->payload;
|
tscAllocPayload(pCmd, sizeof(STagData));
|
||||||
|
STagData *pTag = (STagData *) pCmd->payload;
|
||||||
|
|
||||||
memset(pTag, 0, sizeof(STagData));
|
memset(pTag, 0, sizeof(STagData));
|
||||||
|
|
||||||
/*
|
//the source super table is moved to the secondary position of the pTableMetaInfo list
|
||||||
* the source super table is moved to the secondary position of the pTableMetaInfo list
|
|
||||||
*/
|
|
||||||
if (pQueryInfo->numOfTables < 2) {
|
if (pQueryInfo->numOfTables < 2) {
|
||||||
tscAddEmptyMetaInfo(pQueryInfo);
|
tscAddEmptyMetaInfo(pQueryInfo);
|
||||||
}
|
}
|
||||||
|
@ -897,9 +897,8 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
|
||||||
index = 0;
|
index = 0;
|
||||||
sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes);
|
sToken = tStrGetToken(sql, &index, true, numOfIgnoreToken, &ignoreTokenTypes);
|
||||||
sql += index;
|
sql += index;
|
||||||
if (sToken.n == 0) {
|
|
||||||
break;
|
if (sToken.n == 0 || sToken.type == TK_RP) {
|
||||||
} else if (sToken.type == TK_RP) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -913,11 +912,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pTagSchema[colIndex].type == TSDB_DATA_TYPE_BINARY || pTagSchema[colIndex].type == TSDB_DATA_TYPE_NCHAR) &&
|
|
||||||
sToken.n > pTagSchema[colIndex].bytes) {
|
|
||||||
return tscInvalidSQLErrMsg(pCmd->payload, "string too long", sToken.z);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
index = 0;
|
index = 0;
|
||||||
|
@ -1042,9 +1036,9 @@ int tsParseInsertSql(SSqlObj *pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: 2048 is added because TSDB_MAX_TAGS_LEN now is 65536, but TSDB_PAYLOAD_SIZE is 65380
|
// TODO: 2048 is added because TSDB_MAX_TAGS_LEN now is 65536, but TSDB_PAYLOAD_SIZE is 65380
|
||||||
if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE + 2048)) != TSDB_CODE_SUCCESS) {
|
// if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE + 2048)) != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
// return code;
|
||||||
}
|
// }
|
||||||
|
|
||||||
if (NULL == pCmd->pTableList) {
|
if (NULL == pCmd->pTableList) {
|
||||||
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
||||||
|
|
|
@ -608,6 +608,9 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
|
||||||
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
|
||||||
|
// the expanded size when a row data is converted to SDataRow format
|
||||||
|
const int32_t MAX_EXPAND_SIZE = TD_DATA_ROW_HEAD_SIZE + TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
|
||||||
|
|
||||||
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
||||||
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
||||||
|
|
||||||
|
@ -627,7 +630,9 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t destSize = dataBuf->size + pOneTableBlock->size + pOneTableBlock->size*sizeof(int32_t)*2;
|
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
|
||||||
|
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * MAX_EXPAND_SIZE;
|
||||||
|
|
||||||
if (dataBuf->nAllocSize < destSize) {
|
if (dataBuf->nAllocSize < destSize) {
|
||||||
while (dataBuf->nAllocSize < destSize) {
|
while (dataBuf->nAllocSize < destSize) {
|
||||||
dataBuf->nAllocSize = dataBuf->nAllocSize * 1.5;
|
dataBuf->nAllocSize = dataBuf->nAllocSize * 1.5;
|
||||||
|
@ -648,26 +653,30 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
|
|
||||||
tscSortRemoveDataBlockDupRows(pOneTableBlock);
|
tscSortRemoveDataBlockDupRows(pOneTableBlock);
|
||||||
|
|
||||||
char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
|
char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
|
||||||
|
|
||||||
tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
|
tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
|
||||||
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
|
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
|
||||||
|
|
||||||
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + sizeof(int32_t) * 2);
|
|
||||||
|
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + MAX_EXPAND_SIZE);
|
||||||
|
|
||||||
pBlocks->tid = htonl(pBlocks->tid);
|
pBlocks->tid = htonl(pBlocks->tid);
|
||||||
pBlocks->uid = htobe64(pBlocks->uid);
|
pBlocks->uid = htobe64(pBlocks->uid);
|
||||||
pBlocks->sversion = htonl(pBlocks->sversion);
|
pBlocks->sversion = htonl(pBlocks->sversion);
|
||||||
pBlocks->numOfRows = htons(pBlocks->numOfRows);
|
pBlocks->numOfRows = htons(pBlocks->numOfRows);
|
||||||
|
|
||||||
pBlocks->len = htonl(len);
|
|
||||||
|
|
||||||
// erase the empty space reserved for binary data
|
// erase the empty space reserved for binary data
|
||||||
len = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock);
|
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock);
|
||||||
dataBuf->size += (len + sizeof(SSubmitBlk));
|
assert(finalLen <= len);
|
||||||
|
|
||||||
|
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
|
||||||
|
assert(dataBuf->size <= dataBuf->nAllocSize);
|
||||||
|
|
||||||
|
// the length does not include the SSubmitBlk structure
|
||||||
|
pBlocks->len = htonl(finalLen);
|
||||||
|
|
||||||
dataBuf->numOfTables += 1;
|
dataBuf->numOfTables += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue