support bind for sub tables in the same stable
This commit is contained in:
parent
13d3d4c7fa
commit
950212fd35
|
@ -99,6 +99,7 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
|
||||||
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
|
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
|
||||||
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
|
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
|
||||||
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
|
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
|
||||||
|
int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows);
|
||||||
|
|
||||||
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
|
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
|
||||||
void doRetrieveSubqueryData(SSchedMsg *pMsg);
|
void doRetrieveSubqueryData(SSchedMsg *pMsg);
|
||||||
|
|
|
@ -108,7 +108,8 @@ typedef struct STableDataBlocks {
|
||||||
uint32_t size;
|
uint32_t size;
|
||||||
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
|
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
|
||||||
char *pData;
|
char *pData;
|
||||||
|
bool cloned;
|
||||||
|
|
||||||
SParsedDataColInfo boundColumnInfo;
|
SParsedDataColInfo boundColumnInfo;
|
||||||
|
|
||||||
// for parameter ('?') binding
|
// for parameter ('?') binding
|
||||||
|
|
|
@ -643,7 +643,7 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
|
int32_t FORCE_INLINE tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
|
||||||
pBlocks->tid = pTableMeta->id.tid;
|
pBlocks->tid = pTableMeta->id.tid;
|
||||||
pBlocks->uid = pTableMeta->id.uid;
|
pBlocks->uid = pTableMeta->id.uid;
|
||||||
pBlocks->sversion = pTableMeta->sversion;
|
pBlocks->sversion = pTableMeta->sversion;
|
||||||
|
|
|
@ -55,6 +55,7 @@ typedef struct SMultiTbStmt {
|
||||||
SStrToken stbname;
|
SStrToken stbname;
|
||||||
SStrToken values;
|
SStrToken values;
|
||||||
SArray *tags;
|
SArray *tags;
|
||||||
|
STableDataBlocks *lastBlock;
|
||||||
SHashObj *pTableHash;
|
SHashObj *pTableHash;
|
||||||
SHashObj *pTableBlockHashList; // data block for each table
|
SHashObj *pTableBlockHashList; // data block for each table
|
||||||
} SMultiTbStmt;
|
} SMultiTbStmt;
|
||||||
|
@ -348,7 +349,7 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) {
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
// functions for insertion statement preparation
|
// functions for insertion statement preparation
|
||||||
static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) {
|
static FORCE_INLINE int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) {
|
||||||
if (bind->is_null != NULL && *(bind->is_null)) {
|
if (bind->is_null != NULL && *(bind->is_null)) {
|
||||||
setNull(data + param->offset, param->type, param->bytes);
|
setNull(data + param->offset, param->type, param->bytes);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -747,25 +748,25 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param,
|
||||||
case TSDB_DATA_TYPE_BOOL:
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
case TSDB_DATA_TYPE_TINYINT:
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
case TSDB_DATA_TYPE_UTINYINT:
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
size = 1;
|
*(uint8_t *)(data + param->offset) = *(uint8_t *)bind->buffer;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_SMALLINT:
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
case TSDB_DATA_TYPE_USMALLINT:
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
size = 2;
|
*(uint16_t *)(data + param->offset) = *(uint16_t *)bind->buffer;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_INT:
|
case TSDB_DATA_TYPE_INT:
|
||||||
case TSDB_DATA_TYPE_UINT:
|
case TSDB_DATA_TYPE_UINT:
|
||||||
case TSDB_DATA_TYPE_FLOAT:
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
size = 4;
|
*(uint32_t *)(data + param->offset) = *(uint32_t *)bind->buffer;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_BIGINT:
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
case TSDB_DATA_TYPE_UBIGINT:
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
case TSDB_DATA_TYPE_DOUBLE:
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
size = 8;
|
*(uint64_t *)(data + param->offset) = *(uint64_t *)bind->buffer;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
|
@ -791,7 +792,6 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param,
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(data + param->offset, bind->buffer, size);
|
|
||||||
if (param->offset == 0) {
|
if (param->offset == 0) {
|
||||||
if (tsCheckTimestamp(pBlock, data + param->offset) != TSDB_CODE_SUCCESS) {
|
if (tsCheckTimestamp(pBlock, data + param->offset) != TSDB_CODE_SUCCESS) {
|
||||||
tscError("invalid timestamp");
|
tscError("invalid timestamp");
|
||||||
|
@ -802,6 +802,101 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t insertStmtGenLastBlock(STableDataBlocks** lastBlock, STableDataBlocks* pBlock) {
|
||||||
|
*lastBlock = (STableDataBlocks*)malloc(sizeof(STableDataBlocks));
|
||||||
|
memcpy(*lastBlock, pBlock, sizeof(STableDataBlocks));
|
||||||
|
(*lastBlock)->cloned = true;
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
void* tmp = malloc((*pBlock)->numOfAllocedParams * sizeof(SParamInfo));
|
||||||
|
memcpy(tmp, (*pBlock)->params, (*pBlock)->numOfAllocedParams * sizeof(SParamInfo));
|
||||||
|
(*pBlock)->params = tmp;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
(*lastBlock)->pData = NULL;
|
||||||
|
(*lastBlock)->ordered = true;
|
||||||
|
(*lastBlock)->prevTS = INT64_MIN;
|
||||||
|
(*lastBlock)->size = sizeof(SSubmitBlk);
|
||||||
|
(*lastBlock)->tsSource = -1;
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
if ((*pBlock)->boundColumnInfo.boundedColumns) {
|
||||||
|
tmp = malloc((*pBlock)->boundColumnInfo.numOfCols * sizeof(int32_t));
|
||||||
|
memcpy(tmp, (*pBlock)->boundColumnInfo.boundedColumns, (*pBlock)->boundColumnInfo.numOfCols * sizeof(int32_t));
|
||||||
|
(*pBlock)->boundColumnInfo.boundedColumns = tmp;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*pBlock)->boundColumnInfo.cols) {
|
||||||
|
tmp = malloc((*pBlock)->boundColumnInfo.numOfCols * sizeof(SBoundColumn));
|
||||||
|
memcpy(tmp, (*pBlock)->boundColumnInfo.cols, (*pBlock)->boundColumnInfo.numOfCols * sizeof(SBoundColumn));
|
||||||
|
(*pBlock)->boundColumnInfo.cols = tmp;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int32_t insertStmtGenBlock(STscStmt* pStmt, STableDataBlocks** pBlock, STableMeta* pTableMeta, SName* name) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (pStmt->mtb.lastBlock == NULL) {
|
||||||
|
tscError("no previous data block");
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t msize = tscGetTableMetaSize(pTableMeta);
|
||||||
|
int32_t tsize = sizeof(STableDataBlocks) + msize;
|
||||||
|
|
||||||
|
void *t = malloc(tsize);
|
||||||
|
*pBlock = t;
|
||||||
|
|
||||||
|
//*pBlock = (STableDataBlocks*)malloc(sizeof(STableDataBlocks));
|
||||||
|
memcpy(*pBlock, pStmt->mtb.lastBlock, sizeof(STableDataBlocks));
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
void* tmp = malloc((*pBlock)->numOfAllocedParams * sizeof(SParamInfo));
|
||||||
|
memcpy(tmp, (*pBlock)->params, (*pBlock)->numOfAllocedParams * sizeof(SParamInfo));
|
||||||
|
(*pBlock)->params = tmp;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
t = (char *)t + sizeof(STableDataBlocks);
|
||||||
|
(*pBlock)->pTableMeta = t;
|
||||||
|
memcpy((*pBlock)->pTableMeta, pTableMeta, msize);
|
||||||
|
|
||||||
|
(*pBlock)->pData = malloc((*pBlock)->nAllocSize);
|
||||||
|
//(*pBlock)->pTableMeta = tscTableMetaDup(pTableMeta);
|
||||||
|
//(*pBlock)->pTableMeta = pTableMeta;
|
||||||
|
|
||||||
|
(*pBlock)->vgId = (*pBlock)->pTableMeta->vgId;
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
if ((*pBlock)->boundColumnInfo.boundedColumns) {
|
||||||
|
tmp = malloc((*pBlock)->boundColumnInfo.numOfCols * sizeof(int32_t));
|
||||||
|
memcpy(tmp, (*pBlock)->boundColumnInfo.boundedColumns, (*pBlock)->boundColumnInfo.numOfCols * sizeof(int32_t));
|
||||||
|
(*pBlock)->boundColumnInfo.boundedColumns = tmp;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*pBlock)->boundColumnInfo.cols) {
|
||||||
|
tmp = malloc((*pBlock)->boundColumnInfo.numOfCols * sizeof(SBoundColumn));
|
||||||
|
memcpy(tmp, (*pBlock)->boundColumnInfo.cols, (*pBlock)->boundColumnInfo.numOfCols * sizeof(SBoundColumn));
|
||||||
|
(*pBlock)->boundColumnInfo.cols = tmp;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
tNameAssign(&(*pBlock)->tableName, name);
|
||||||
|
|
||||||
|
SSubmitBlk* blk = (SSubmitBlk*)(*pBlock)->pData;
|
||||||
|
memset(blk, 0, sizeof(*blk));
|
||||||
|
|
||||||
|
code = tsSetBlockInfo(blk, pTableMeta, 0);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
STMT_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int doBindBatchParam(STableDataBlocks* pBlock, SParamInfo* param, TAOS_MULTI_BIND* bind, int32_t rowNum) {
|
static int doBindBatchParam(STableDataBlocks* pBlock, SParamInfo* param, TAOS_MULTI_BIND* bind, int32_t rowNum) {
|
||||||
if (bind->buffer_type != param->type || !isValidDataType(param->type)) {
|
if (bind->buffer_type != param->type || !isValidDataType(param->type)) {
|
||||||
|
@ -1173,6 +1268,8 @@ static void insertBatchClean(STscStmt* pStmt) {
|
||||||
|
|
||||||
static int insertBatchStmtExecute(STscStmt* pStmt) {
|
static int insertBatchStmtExecute(STscStmt* pStmt) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
int64_t st1 = taosGetTimestampUs();
|
||||||
|
|
||||||
if(pStmt->mtb.nameSet == false) {
|
if(pStmt->mtb.nameSet == false) {
|
||||||
tscError("0x%"PRIx64" no table name set", pStmt->pSql->self);
|
tscError("0x%"PRIx64" no table name set", pStmt->pSql->self);
|
||||||
|
@ -1188,23 +1285,35 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
|
||||||
|
|
||||||
fillTablesColumnsNull(pStmt->pSql);
|
fillTablesColumnsNull(pStmt->pSql);
|
||||||
|
|
||||||
|
int64_t st2 = taosGetTimestampUs();
|
||||||
|
|
||||||
if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) {
|
if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t st3 = taosGetTimestampUs();
|
||||||
|
|
||||||
code = tscHandleMultivnodeInsert(pStmt->pSql);
|
code = tscHandleMultivnodeInsert(pStmt->pSql);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t st4 = taosGetTimestampUs();
|
||||||
|
|
||||||
// wait for the callback function to post the semaphore
|
// wait for the callback function to post the semaphore
|
||||||
tsem_wait(&pStmt->pSql->rspSem);
|
tsem_wait(&pStmt->pSql->rspSem);
|
||||||
|
|
||||||
|
int64_t st5 = taosGetTimestampUs();
|
||||||
|
|
||||||
code = pStmt->pSql->res.code;
|
code = pStmt->pSql->res.code;
|
||||||
|
|
||||||
insertBatchClean(pStmt);
|
insertBatchClean(pStmt);
|
||||||
|
|
||||||
|
int64_t st6 = taosGetTimestampUs();
|
||||||
|
|
||||||
|
tscDebug("use time:%"PRId64 ",%"PRId64 ",%"PRId64 ",%"PRId64 ",%"PRId64, st2-st1, st3-st2, st4-st3, st5-st4, st6-st5);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1228,11 +1337,11 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
|
||||||
pStmt->mtb.tbname = sToken;
|
pStmt->mtb.tbname = sToken;
|
||||||
pStmt->mtb.nameSet = false;
|
pStmt->mtb.nameSet = false;
|
||||||
if (pStmt->mtb.pTableHash == NULL) {
|
if (pStmt->mtb.pTableHash == NULL) {
|
||||||
pStmt->mtb.pTableHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
pStmt->mtb.pTableHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStmt->mtb.pTableBlockHashList == NULL) {
|
if (pStmt->mtb.pTableBlockHashList == NULL) {
|
||||||
pStmt->mtb.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
pStmt->mtb.pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
pStmt->mtb.tagSet = true;
|
pStmt->mtb.tagSet = true;
|
||||||
|
@ -1561,6 +1670,9 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
|
||||||
|
|
||||||
SSubmitBlk* pBlk = (SSubmitBlk*) (*t1)->pData;
|
SSubmitBlk* pBlk = (SSubmitBlk*) (*t1)->pData;
|
||||||
pCmd->batchSize = pBlk->numOfRows;
|
pCmd->batchSize = pBlk->numOfRows;
|
||||||
|
if (pBlk->numOfRows == 0) {
|
||||||
|
(*t1)->prevTS = INT64_MIN;
|
||||||
|
}
|
||||||
|
|
||||||
taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)t1, POINTER_BYTES);
|
taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)t1, POINTER_BYTES);
|
||||||
|
|
||||||
|
@ -1580,6 +1692,9 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
|
||||||
tname.n = strlen(name);
|
tname.n = strlen(name);
|
||||||
SName fullname = {0};
|
SName fullname = {0};
|
||||||
tscSetTableFullName(&fullname, &tname, pSql);
|
tscSetTableFullName(&fullname, &tname, pSql);
|
||||||
|
|
||||||
|
memcpy(&pTableMetaInfo->name, &fullname, sizeof(fullname));
|
||||||
|
|
||||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
STMT_RET(code);
|
STMT_RET(code);
|
||||||
|
@ -1592,25 +1707,21 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
|
||||||
STMT_RET(TSDB_CODE_TSC_APP_ERROR);
|
STMT_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(&pTableMetaInfo->name, &fullname, sizeof(fullname));
|
|
||||||
|
|
||||||
STableDataBlocks* pBlock = NULL;
|
STableDataBlocks* pBlock = NULL;
|
||||||
code = tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
|
||||||
pTableMeta->tableInfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pBlock, NULL);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
STMT_RET(code);
|
|
||||||
}
|
|
||||||
|
|
||||||
SSubmitBlk* blk = (SSubmitBlk*)pBlock->pData;
|
insertStmtGenBlock(pStmt, &pBlock, pTableMeta, &pTableMetaInfo->name);
|
||||||
blk->numOfRows = 0;
|
|
||||||
|
|
||||||
|
pCmd->batchSize = 0;
|
||||||
|
|
||||||
pStmt->mtb.currentUid = pTableMeta->id.uid;
|
pStmt->mtb.currentUid = pTableMeta->id.uid;
|
||||||
pStmt->mtb.tbNum++;
|
pStmt->mtb.tbNum++;
|
||||||
|
|
||||||
|
taosHashPut(pCmd->insertParam.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES);
|
||||||
taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES);
|
taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES);
|
||||||
taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid));
|
taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid));
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid);
|
tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid);
|
||||||
|
|
||||||
STMT_RET(TSDB_CODE_SUCCESS);
|
STMT_RET(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1670,6 +1781,10 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
|
||||||
taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES);
|
taosHashPut(pStmt->mtb.pTableBlockHashList, (void *)&pStmt->mtb.currentUid, sizeof(pStmt->mtb.currentUid), (void*)&pBlock, POINTER_BYTES);
|
||||||
taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid));
|
taosHashPut(pStmt->mtb.pTableHash, name, strlen(name), (char*) &pTableMeta->id.uid, sizeof(pTableMeta->id.uid));
|
||||||
|
|
||||||
|
if (pStmt->mtb.lastBlock == NULL) {
|
||||||
|
insertStmtGenLastBlock(&pStmt->mtb.lastBlock, pBlock);
|
||||||
|
}
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid);
|
tscDebug("0x%"PRIx64" table:%s is prepared, uid:%" PRIx64, pSql->self, name, pStmt->mtb.currentUid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1711,6 +1826,7 @@ int taos_stmt_close(TAOS_STMT* stmt) {
|
||||||
if (pStmt->pSql && pStmt->pSql->res.code != 0) {
|
if (pStmt->pSql && pStmt->pSql->res.code != 0) {
|
||||||
rmMeta = true;
|
rmMeta = true;
|
||||||
}
|
}
|
||||||
|
tscDestroyDataBlock(pStmt->mtb.lastBlock, rmMeta);
|
||||||
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta);
|
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, rmMeta);
|
||||||
taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList);
|
taosHashCleanup(pStmt->pSql->cmd.insertParam.pTableBlockHashList);
|
||||||
pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL;
|
pStmt->pSql->cmd.insertParam.pTableBlockHashList = NULL;
|
||||||
|
@ -1745,6 +1861,8 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
|
||||||
|
|
||||||
pStmt->last = STMT_BIND;
|
pStmt->last = STMT_BIND;
|
||||||
|
|
||||||
|
tscDebug("tableId:%" PRIu64 ", try to bind one row", pStmt->mtb.currentUid);
|
||||||
|
|
||||||
STMT_RET(insertStmtBindParam(pStmt, bind));
|
STMT_RET(insertStmtBindParam(pStmt, bind));
|
||||||
} else {
|
} else {
|
||||||
STMT_RET(normalStmtBindParam(pStmt, bind));
|
STMT_RET(normalStmtBindParam(pStmt, bind));
|
||||||
|
|
|
@ -1478,12 +1478,6 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pDataBlock->pData);
|
tfree(pDataBlock->pData);
|
||||||
tfree(pDataBlock->params);
|
|
||||||
|
|
||||||
// free the refcount for metermeta
|
|
||||||
if (pDataBlock->pTableMeta != NULL) {
|
|
||||||
tfree(pDataBlock->pTableMeta);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (removeMeta) {
|
if (removeMeta) {
|
||||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
|
@ -1492,7 +1486,17 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
|
||||||
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
|
if (!pDataBlock->cloned) {
|
||||||
|
tfree(pDataBlock->params);
|
||||||
|
|
||||||
|
// free the refcount for metermeta
|
||||||
|
if (pDataBlock->pTableMeta != NULL) {
|
||||||
|
tfree(pDataBlock->pTableMeta);
|
||||||
|
}
|
||||||
|
|
||||||
|
tscDestroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
|
||||||
|
}
|
||||||
|
|
||||||
tfree(pDataBlock);
|
tfree(pDataBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1630,12 +1634,14 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff
|
||||||
dataBuf->nAllocSize = dataBuf->headerSize * 2;
|
dataBuf->nAllocSize = dataBuf->headerSize * 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
dataBuf->pData = calloc(1, dataBuf->nAllocSize);
|
//dataBuf->pData = calloc(1, dataBuf->nAllocSize);
|
||||||
|
dataBuf->pData = malloc(dataBuf->nAllocSize);
|
||||||
if (dataBuf->pData == NULL) {
|
if (dataBuf->pData == NULL) {
|
||||||
tscError("failed to allocated memory, reason:%s", strerror(errno));
|
tscError("failed to allocated memory, reason:%s", strerror(errno));
|
||||||
tfree(dataBuf);
|
tfree(dataBuf);
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
memset(dataBuf->pData, 0, sizeof(SSubmitBlk));
|
||||||
|
|
||||||
//Here we keep the tableMeta to avoid it to be remove by other threads.
|
//Here we keep the tableMeta to avoid it to be remove by other threads.
|
||||||
dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
|
dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
|
||||||
|
@ -1761,16 +1767,14 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
||||||
static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeBlockMap) {
|
static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeBlockMap) {
|
||||||
pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList);
|
pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList);
|
||||||
if (pInsertParam->pTableNameList == NULL) {
|
if (pInsertParam->pTableNameList == NULL) {
|
||||||
pInsertParam->pTableNameList = calloc(pInsertParam->numOfTables, POINTER_BYTES);
|
pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES);
|
||||||
} else {
|
|
||||||
memset(pInsertParam->pTableNameList, 0, pInsertParam->numOfTables * POINTER_BYTES);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
|
STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
while(p1) {
|
while(p1) {
|
||||||
STableDataBlocks* pBlocks = *p1;
|
STableDataBlocks* pBlocks = *p1;
|
||||||
tfree(pInsertParam->pTableNameList[i]);
|
//tfree(pInsertParam->pTableNameList[i]);
|
||||||
|
|
||||||
pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName);
|
pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName);
|
||||||
p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1);
|
p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1);
|
||||||
|
@ -1809,14 +1813,12 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
|
||||||
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
||||||
|
|
||||||
if (dataBuf->nAllocSize < destSize) {
|
if (dataBuf->nAllocSize < destSize) {
|
||||||
while (dataBuf->nAllocSize < destSize) {
|
dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
|
||||||
dataBuf->nAllocSize = (uint32_t)(dataBuf->nAllocSize * 1.5);
|
|
||||||
}
|
|
||||||
|
|
||||||
char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
|
char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
|
||||||
if (tmp != NULL) {
|
if (tmp != NULL) {
|
||||||
dataBuf->pData = tmp;
|
dataBuf->pData = tmp;
|
||||||
memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
|
//memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
|
||||||
} else { // failed to allocate memory, free already allocated memory and return error code
|
} else { // failed to allocate memory, free already allocated memory and return error code
|
||||||
tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize);
|
tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize);
|
||||||
|
|
||||||
|
@ -4144,7 +4146,7 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta) {
|
||||||
assert(pTableMeta != NULL);
|
assert(pTableMeta != NULL);
|
||||||
size_t size = tscGetTableMetaSize(pTableMeta);
|
size_t size = tscGetTableMetaSize(pTableMeta);
|
||||||
|
|
||||||
STableMeta* p = calloc(1, size);
|
STableMeta* p = malloc(size);
|
||||||
memcpy(p, pTableMeta, size);
|
memcpy(p, pTableMeta, size);
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
|
@ -306,7 +306,7 @@ bool tIsValidName(const SName* name) {
|
||||||
SName* tNameDup(const SName* name) {
|
SName* tNameDup(const SName* name) {
|
||||||
assert(name != NULL);
|
assert(name != NULL);
|
||||||
|
|
||||||
SName* p = calloc(1, sizeof(SName));
|
SName* p = malloc(sizeof(SName));
|
||||||
memcpy(p, name, sizeof(SName));
|
memcpy(p, name, sizeof(SName));
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue