fix: enable client dedup for stmt/stmt2 inserts in interlace mode
This commit is contained in:
parent
81ff7cd60b
commit
074c3e7eb2
|
@ -378,7 +378,7 @@ typedef struct {
|
||||||
TAOS_MULTI_BIND *bind;
|
TAOS_MULTI_BIND *bind;
|
||||||
} SBindInfo;
|
} SBindInfo;
|
||||||
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
|
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
|
||||||
SArray *rowArray);
|
SArray *rowArray, bool *orderedDup);
|
||||||
|
|
||||||
// stmt2 binding
|
// stmt2 binding
|
||||||
int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int32_t buffMaxLen, initGeosFn igeos,
|
int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int32_t buffMaxLen, initGeosFn igeos,
|
||||||
|
@ -392,7 +392,7 @@ typedef struct {
|
||||||
} SBindInfo2;
|
} SBindInfo2;
|
||||||
|
|
||||||
int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
|
int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
|
||||||
SArray *rowArray);
|
SArray *rowArray, bool *orderedDup);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -449,9 +449,10 @@ static int32_t tBindInfoCompare(const void *p1, const void *p2, const void *para
|
||||||
* `infoSorted` is whether the bind information is sorted by column id
|
* `infoSorted` is whether the bind information is sorted by column id
|
||||||
* `pTSchema` is the schema of the table
|
* `pTSchema` is the schema of the table
|
||||||
* `rowArray` is the array to store the rows
|
* `rowArray` is the array to store the rows
|
||||||
|
* `orderedDup` is an array to store ordered and duplicateTs
|
||||||
*/
|
*/
|
||||||
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
|
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
|
||||||
SArray *rowArray) {
|
SArray *rowArray, bool *orderedDup) {
|
||||||
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
|
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
@ -469,6 +470,7 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SRowKey rowKey, lastRowKey;
|
||||||
for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
|
for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
|
||||||
taosArrayClear(colValArray);
|
taosArrayClear(colValArray);
|
||||||
|
|
||||||
|
@ -507,6 +509,24 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (orderedDup) {
|
||||||
|
tRowGetKey(row, &rowKey);
|
||||||
|
if (iRow == 0) {
|
||||||
|
// init to ordered by default
|
||||||
|
orderedDup[0] = true;
|
||||||
|
// init to non-duplicate by default
|
||||||
|
orderedDup[1] = false;
|
||||||
|
} else {
|
||||||
|
// no more compare if we already get disordered or duplicate rows
|
||||||
|
if (orderedDup[0] && !orderedDup[1]) {
|
||||||
|
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey);
|
||||||
|
orderedDup[0] = (code >= 0);
|
||||||
|
orderedDup[1] = (code == 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lastRowKey = rowKey;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -3235,9 +3255,10 @@ _exit:
|
||||||
* `infoSorted` is whether the bind information is sorted by column id
|
* `infoSorted` is whether the bind information is sorted by column id
|
||||||
* `pTSchema` is the schema of the table
|
* `pTSchema` is the schema of the table
|
||||||
* `rowArray` is the array to store the rows
|
* `rowArray` is the array to store the rows
|
||||||
|
* `orderedDup` is an array to store ordered and duplicateTs
|
||||||
*/
|
*/
|
||||||
int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
|
int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
|
||||||
SArray *rowArray) {
|
SArray *rowArray, bool *orderedDup) {
|
||||||
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
|
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
@ -3266,6 +3287,7 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SRowKey rowKey, lastRowKey;
|
||||||
for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
|
for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
|
||||||
taosArrayClear(colValArray);
|
taosArrayClear(colValArray);
|
||||||
|
|
||||||
|
@ -3317,6 +3339,24 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (orderedDup) {
|
||||||
|
tRowGetKey(row, &rowKey);
|
||||||
|
if (iRow == 0) {
|
||||||
|
// init to ordered by default
|
||||||
|
orderedDup[0] = true;
|
||||||
|
// init to non-duplicate by default
|
||||||
|
orderedDup[1] = false;
|
||||||
|
} else {
|
||||||
|
// no more compare if we already get disordered or duplicate rows
|
||||||
|
if (orderedDup[0] && !orderedDup[1]) {
|
||||||
|
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey);
|
||||||
|
orderedDup[0] = (code >= 0);
|
||||||
|
orderedDup[1] = (code == 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lastRowKey = rowKey;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
|
|
@ -323,6 +323,7 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int16_t lastColId = -1;
|
int16_t lastColId = -1;
|
||||||
bool colInOrder = true;
|
bool colInOrder = true;
|
||||||
|
bool orderedDup[2];
|
||||||
|
|
||||||
if (NULL == *pTSchema) {
|
if (NULL == *pTSchema) {
|
||||||
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
|
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
|
||||||
|
@ -368,7 +369,9 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
|
||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
|
code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, orderedDup);
|
||||||
|
pDataBlock->ordered = orderedDup[0];
|
||||||
|
pDataBlock->duplicateTs = orderedDup[1];
|
||||||
|
|
||||||
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
|
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
|
||||||
|
|
||||||
|
@ -689,6 +692,7 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin
|
||||||
int16_t lastColId = -1;
|
int16_t lastColId = -1;
|
||||||
bool colInOrder = true;
|
bool colInOrder = true;
|
||||||
int ncharColNums = 0;
|
int ncharColNums = 0;
|
||||||
|
bool orderedDup[2];
|
||||||
|
|
||||||
if (NULL == *pTSchema) {
|
if (NULL == *pTSchema) {
|
||||||
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
|
*pTSchema = tBuildTSchema(pSchema, pDataBlock->pMeta->tableInfo.numOfColumns, pDataBlock->pMeta->sversion);
|
||||||
|
@ -745,7 +749,9 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin
|
||||||
pBindInfos[c].bytes = pColSchema->bytes;
|
pBindInfos[c].bytes = pColSchema->bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
|
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, orderedDup);
|
||||||
|
pDataBlock->ordered = orderedDup[0];
|
||||||
|
pDataBlock->duplicateTs = orderedDup[1];
|
||||||
|
|
||||||
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
|
qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,8 @@ exe:
|
||||||
# gcc $(CFLAGS) ./stmt2-get-fields.c -o $(ROOT)stmt2-get-fields $(LFLAGS)
|
# gcc $(CFLAGS) ./stmt2-get-fields.c -o $(ROOT)stmt2-get-fields $(LFLAGS)
|
||||||
# gcc $(CFLAGS) ./stmt2-nohole.c -o $(ROOT)stmt2-nohole $(LFLAGS)
|
# gcc $(CFLAGS) ./stmt2-nohole.c -o $(ROOT)stmt2-nohole $(LFLAGS)
|
||||||
gcc $(CFLAGS) ./stmt-crash.c -o $(ROOT)stmt-crash $(LFLAGS)
|
gcc $(CFLAGS) ./stmt-crash.c -o $(ROOT)stmt-crash $(LFLAGS)
|
||||||
|
gcc $(CFLAGS) ./stmt-insert-dupkeys.c -o $(ROOT)stmt-insert-dupkeys $(LFLAGS)
|
||||||
|
gcc $(CFLAGS) ./stmt2-insert-dupkeys.c -o $(ROOT)stmt2-insert-dupkeys $(LFLAGS)
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
rm $(ROOT)batchprepare
|
rm $(ROOT)batchprepare
|
||||||
|
@ -47,3 +49,5 @@ clean:
|
||||||
rm $(ROOT)stmt2-get-fields
|
rm $(ROOT)stmt2-get-fields
|
||||||
rm $(ROOT)stmt2-nohole
|
rm $(ROOT)stmt2-nohole
|
||||||
rm $(ROOT)stmt-crash
|
rm $(ROOT)stmt-crash
|
||||||
|
rm $(ROOT)stmt-insert-dupkeys
|
||||||
|
rm $(ROOT)stmt2-insert-dupkeys
|
||||||
|
|
|
@ -0,0 +1,234 @@
|
||||||
|
// compile with
|
||||||
|
// gcc -o stmt-insert-dupkeys stmt-insert-dupkeys.c -ltaos
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include "taos.h"
|
||||||
|
|
||||||
|
#define NUMROWS 3
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief execute sql only and ignore result set
|
||||||
|
*
|
||||||
|
* @param taos
|
||||||
|
* @param sql
|
||||||
|
*/
|
||||||
|
void executeSQL(TAOS *taos, const char *sql) {
|
||||||
|
TAOS_RES *res = taos_query(taos, sql);
|
||||||
|
int code = taos_errno(res);
|
||||||
|
if (code != 0) {
|
||||||
|
printf("%s\n", taos_errstr(res));
|
||||||
|
taos_free_result(res);
|
||||||
|
taos_close(taos);
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
taos_free_result(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief exit program when error occur.
|
||||||
|
*
|
||||||
|
* @param stmt
|
||||||
|
* @param code
|
||||||
|
* @param msg
|
||||||
|
*/
|
||||||
|
void checkErrorCode(TAOS_STMT *stmt, int code, const char *msg) {
|
||||||
|
if (code != 0) {
|
||||||
|
printf("%s. error: %s\n", msg, taos_stmt_errstr(stmt));
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void prepareBindTags(TAOS_MULTI_BIND *tags) {
|
||||||
|
// bind table name and tags
|
||||||
|
char *location = "California.SanFrancisco";
|
||||||
|
int groupId = 2;
|
||||||
|
tags[0].buffer_type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
tags[0].buffer_length = strlen(location);
|
||||||
|
tags[0].length = (int32_t *)&tags[0].buffer_length;
|
||||||
|
tags[0].buffer = location;
|
||||||
|
tags[0].is_null = NULL;
|
||||||
|
|
||||||
|
tags[1].buffer_type = TSDB_DATA_TYPE_INT;
|
||||||
|
tags[1].buffer_length = sizeof(int);
|
||||||
|
tags[1].length = (int32_t *)&tags[1].buffer_length;
|
||||||
|
tags[1].buffer = &groupId;
|
||||||
|
tags[1].is_null = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void prepareBindParams(TAOS_MULTI_BIND *params, int64_t *ts, float *current, int *voltage, float *phase) {
|
||||||
|
// is_null array
|
||||||
|
char is_null[NUMROWS] = {0};
|
||||||
|
// length array
|
||||||
|
int32_t int64Len[NUMROWS] = {sizeof(int64_t)};
|
||||||
|
int32_t floatLen[NUMROWS] = {sizeof(float)};
|
||||||
|
int32_t intLen[NUMROWS] = {sizeof(int)};
|
||||||
|
|
||||||
|
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
params[0].buffer_length = sizeof(int64_t);
|
||||||
|
params[0].buffer = ts;
|
||||||
|
params[0].length = int64Len;
|
||||||
|
params[0].is_null = is_null;
|
||||||
|
params[0].num = NUMROWS;
|
||||||
|
|
||||||
|
params[1].buffer_type = TSDB_DATA_TYPE_FLOAT;
|
||||||
|
params[1].buffer_length = sizeof(float);
|
||||||
|
params[1].buffer = current;
|
||||||
|
params[1].length = floatLen;
|
||||||
|
params[1].is_null = is_null;
|
||||||
|
params[1].num = NUMROWS;
|
||||||
|
|
||||||
|
params[2].buffer_type = TSDB_DATA_TYPE_INT;
|
||||||
|
params[2].buffer_length = sizeof(int);
|
||||||
|
params[2].buffer = voltage;
|
||||||
|
params[2].length = intLen;
|
||||||
|
params[2].is_null = is_null;
|
||||||
|
params[2].num = NUMROWS;
|
||||||
|
|
||||||
|
params[3].buffer_type = TSDB_DATA_TYPE_FLOAT;
|
||||||
|
params[3].buffer_length = sizeof(float);
|
||||||
|
params[3].buffer = phase;
|
||||||
|
params[3].length = floatLen;
|
||||||
|
params[3].is_null = is_null;
|
||||||
|
params[3].num = NUMROWS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief insert data using stmt API
|
||||||
|
*
|
||||||
|
* @param taos
|
||||||
|
*/
|
||||||
|
void insertData(TAOS *taos, int64_t *ts, float *current, int *voltage, float *phase) {
|
||||||
|
// init
|
||||||
|
TAOS_STMT *stmt = taos_stmt_init(taos);
|
||||||
|
|
||||||
|
// prepare
|
||||||
|
const char *sql = "INSERT INTO ? USING meters TAGS(?, ?) values(?, ?, ?, ?)";
|
||||||
|
int code = taos_stmt_prepare(stmt, sql, 0);
|
||||||
|
checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");
|
||||||
|
|
||||||
|
// bind table name and tags
|
||||||
|
TAOS_MULTI_BIND tags[2];
|
||||||
|
prepareBindTags(tags);
|
||||||
|
code = taos_stmt_set_tbname_tags(stmt, "d1001", tags);
|
||||||
|
checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags");
|
||||||
|
|
||||||
|
TAOS_MULTI_BIND params[4];
|
||||||
|
prepareBindParams(params, ts, current, voltage, phase);
|
||||||
|
|
||||||
|
code = taos_stmt_bind_param_batch(stmt, params); // bind batch
|
||||||
|
checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch");
|
||||||
|
|
||||||
|
code = taos_stmt_add_batch(stmt); // add batch
|
||||||
|
checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch");
|
||||||
|
|
||||||
|
// execute
|
||||||
|
code = taos_stmt_execute(stmt);
|
||||||
|
checkErrorCode(stmt, code, "failed to execute taos_stmt_execute");
|
||||||
|
|
||||||
|
int affectedRows = taos_stmt_affected_rows(stmt);
|
||||||
|
printf("successfully inserted %d rows\n", affectedRows);
|
||||||
|
|
||||||
|
// close
|
||||||
|
(void)taos_stmt_close(stmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
void insertDataInterlace(TAOS *taos, int64_t *ts, float *current, int *voltage, float *phase) {
|
||||||
|
// init with interlace mode
|
||||||
|
TAOS_STMT_OPTIONS op;
|
||||||
|
op.reqId = 0;
|
||||||
|
op.singleStbInsert = true;
|
||||||
|
op.singleTableBindOnce = true;
|
||||||
|
TAOS_STMT *stmt = taos_stmt_init_with_options(taos, &op);
|
||||||
|
|
||||||
|
// prepare
|
||||||
|
const char *sql = "INSERT INTO ? values(?, ?, ?, ?)";
|
||||||
|
int code = taos_stmt_prepare(stmt, sql, 0);
|
||||||
|
checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");
|
||||||
|
|
||||||
|
// bind table name and tags
|
||||||
|
TAOS_MULTI_BIND tags[2];
|
||||||
|
prepareBindTags(tags);
|
||||||
|
code = taos_stmt_set_tbname_tags(stmt, "d1001", tags);
|
||||||
|
checkErrorCode(stmt, code, "failed to execute taos_stmt_set_tbname_tags");
|
||||||
|
|
||||||
|
TAOS_MULTI_BIND params[4];
|
||||||
|
prepareBindParams(params, ts, current, voltage, phase);
|
||||||
|
|
||||||
|
code = taos_stmt_bind_param_batch(stmt, params); // bind batch
|
||||||
|
checkErrorCode(stmt, code, "failed to execute taos_stmt_bind_param_batch");
|
||||||
|
|
||||||
|
code = taos_stmt_add_batch(stmt); // add batch
|
||||||
|
checkErrorCode(stmt, code, "failed to execute taos_stmt_add_batch");
|
||||||
|
|
||||||
|
// execute
|
||||||
|
code = taos_stmt_execute(stmt);
|
||||||
|
checkErrorCode(stmt, code, "failed to execute taos_stmt_execute");
|
||||||
|
|
||||||
|
int affectedRows = taos_stmt_affected_rows(stmt);
|
||||||
|
printf("successfully inserted %d rows\n", affectedRows);
|
||||||
|
|
||||||
|
// close
|
||||||
|
(void)taos_stmt_close(stmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 6030);
|
||||||
|
if (taos == NULL) {
|
||||||
|
printf("failed to connect to server\n");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
executeSQL(taos, "DROP DATABASE IF EXISTS power");
|
||||||
|
executeSQL(taos, "CREATE DATABASE power");
|
||||||
|
executeSQL(taos, "USE power");
|
||||||
|
executeSQL(taos,
|
||||||
|
"CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), "
|
||||||
|
"groupId INT)");
|
||||||
|
|
||||||
|
// initial insert, expect insert 3 rows
|
||||||
|
int64_t ts0[] = {1648432611234, 1648432611345, 1648432611456};
|
||||||
|
float current0[] = {10.1f, 10.2f, 10.3f};
|
||||||
|
int voltage0[] = {216, 217, 218};
|
||||||
|
float phase0[] = {0.31f, 0.32f, 0.33f};
|
||||||
|
insertData(taos, ts0, current0, voltage0, phase0);
|
||||||
|
|
||||||
|
// insert with interlace mode, send non-duplicate ts, expect insert 3 overlapped rows
|
||||||
|
int64_t ts1[] = {1648432611234, 1648432611345, 1648432611456};
|
||||||
|
int voltage1[] = {219, 220, 221};
|
||||||
|
insertDataInterlace(taos, ts1, current0, voltage1, phase0);
|
||||||
|
|
||||||
|
// insert with interlace mode, send duplicate ts, expect insert 2 rows with dups merged
|
||||||
|
int64_t ts2[] = {1648432611678, 1648432611678, 1648432611789};
|
||||||
|
int voltage2[] = {222, 223, 224};
|
||||||
|
insertDataInterlace(taos, ts2, current0, voltage2, phase0);
|
||||||
|
|
||||||
|
// insert with interlace mode, send disordered rows, expect insert 3 sorted rows
|
||||||
|
int64_t ts3[] = {1648432611900, 1648432611890, 1648432611910};
|
||||||
|
int voltage3[] = {225, 226, 227};
|
||||||
|
insertDataInterlace(taos, ts3, current0, voltage3, phase0);
|
||||||
|
|
||||||
|
// insert with interlace mode, send disordered and duplicate rows, expect insert 2 sorted and dup-merged rows
|
||||||
|
int64_t ts4[] = {1648432611930, 1648432611920, 1648432611930};
|
||||||
|
int voltage4[] = {228, 229, 230};
|
||||||
|
insertDataInterlace(taos, ts4, current0, voltage4, phase0);
|
||||||
|
|
||||||
|
taos_close(taos);
|
||||||
|
taos_cleanup();
|
||||||
|
|
||||||
|
// final results
|
||||||
|
// taos> select * from d1001;
|
||||||
|
// ts | current | voltage | phase |
|
||||||
|
// ======================================================================================
|
||||||
|
// 2022-03-28 09:56:51.234 | 10.1000004 | 219 | 0.3100000 |
|
||||||
|
// 2022-03-28 09:56:51.345 | 10.1999998 | 220 | 0.3200000 |
|
||||||
|
// 2022-03-28 09:56:51.456 | 10.3000002 | 221 | 0.3300000 |
|
||||||
|
// 2022-03-28 09:56:51.678 | 10.1999998 | 223 | 0.3200000 |
|
||||||
|
// 2022-03-28 09:56:51.789 | 10.3000002 | 224 | 0.3300000 |
|
||||||
|
// 2022-03-28 09:56:51.890 | 10.1999998 | 226 | 0.3200000 |
|
||||||
|
// 2022-03-28 09:56:51.900 | 10.1000004 | 225 | 0.3100000 |
|
||||||
|
// 2022-03-28 09:56:51.910 | 10.3000002 | 227 | 0.3300000 |
|
||||||
|
// 2022-03-28 09:56:51.920 | 10.1999998 | 229 | 0.3200000 |
|
||||||
|
// 2022-03-28 09:56:51.930 | 10.3000002 | 230 | 0.3300000 |
|
||||||
|
// Query OK, 10 row(s) in set (0.005083s)
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,212 @@
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include "taos.h"
|
||||||
|
|
||||||
|
int CTB_NUMS = 3;
|
||||||
|
int ROW_NUMS = 3;
|
||||||
|
|
||||||
|
void do_query(TAOS* taos, const char* sql) {
|
||||||
|
TAOS_RES* result = taos_query(taos, sql);
|
||||||
|
int code = taos_errno(result);
|
||||||
|
if (code) {
|
||||||
|
printf("failed to query: %s, reason:%s\n", sql, taos_errstr(result));
|
||||||
|
taos_free_result(result);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
taos_free_result(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
void createdb(TAOS* taos) {
|
||||||
|
do_query(taos, "drop database if exists db");
|
||||||
|
do_query(taos, "create database db");
|
||||||
|
do_query(taos, "create stable db.stb (ts timestamp, b binary(10)) tags(t1 int, t2 binary(10))");
|
||||||
|
do_query(taos, "use db");
|
||||||
|
}
|
||||||
|
|
||||||
|
#define INIT(tbs, ts, ts_len, b, b_len, tags, paramv) \
|
||||||
|
do { \
|
||||||
|
/* tbname */ \
|
||||||
|
tbs = (char**)malloc(CTB_NUMS * sizeof(char*)); \
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) { \
|
||||||
|
tbs[i] = (char*)malloc(sizeof(char) * 20); \
|
||||||
|
sprintf(tbs[i], "ctb_%d", i); \
|
||||||
|
} \
|
||||||
|
/* col params */ \
|
||||||
|
ts = (int64_t**)malloc(CTB_NUMS * sizeof(int64_t*)); \
|
||||||
|
b = (char**)malloc(CTB_NUMS * sizeof(char*)); \
|
||||||
|
ts_len = (int*)malloc(ROW_NUMS * sizeof(int)); \
|
||||||
|
b_len = (int*)malloc(ROW_NUMS * sizeof(int)); \
|
||||||
|
for (int i = 0; i < ROW_NUMS; i++) { \
|
||||||
|
ts_len[i] = sizeof(int64_t); \
|
||||||
|
b_len[i] = 1; \
|
||||||
|
} \
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) { \
|
||||||
|
ts[i] = (int64_t*)malloc(ROW_NUMS * sizeof(int64_t)); \
|
||||||
|
b[i] = (char*)malloc(ROW_NUMS * sizeof(char)); \
|
||||||
|
for (int j = 0; j < ROW_NUMS; j++) { \
|
||||||
|
ts[i][j] = 1591060628000 + j; \
|
||||||
|
b[i][j] = (char)('a' + j); \
|
||||||
|
} \
|
||||||
|
} \
|
||||||
|
/*tag params */ \
|
||||||
|
int t1 = 0; \
|
||||||
|
int t1len = sizeof(int); \
|
||||||
|
int t2len = 3; \
|
||||||
|
/* bind params */ \
|
||||||
|
paramv = (TAOS_STMT2_BIND**)malloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*)); \
|
||||||
|
tags = (TAOS_STMT2_BIND**)malloc(CTB_NUMS * sizeof(TAOS_STMT2_BIND*)); \
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) { \
|
||||||
|
/* create tags */ \
|
||||||
|
tags[i] = (TAOS_STMT2_BIND*)malloc(2 * sizeof(TAOS_STMT2_BIND)); \
|
||||||
|
tags[i][0] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_INT, &t1, &t1len, NULL, 0}; \
|
||||||
|
tags[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_BINARY, "after", &t2len, NULL, 0}; \
|
||||||
|
/* create col params */ \
|
||||||
|
paramv[i] = (TAOS_STMT2_BIND*)malloc(2 * sizeof(TAOS_STMT2_BIND)); \
|
||||||
|
paramv[i][0] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS}; \
|
||||||
|
paramv[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS}; \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define UINIT(tbs, ts, ts_len, b, b_len, tags, paramv) \
|
||||||
|
do { \
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) { \
|
||||||
|
free(tbs[i]); \
|
||||||
|
} \
|
||||||
|
free(tbs); \
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) { \
|
||||||
|
free(ts[i]); \
|
||||||
|
free(b[i]); \
|
||||||
|
} \
|
||||||
|
free(ts); \
|
||||||
|
free(b); \
|
||||||
|
free(ts_len); \
|
||||||
|
free(b_len); \
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) { \
|
||||||
|
free(tags[i]); \
|
||||||
|
free(paramv[i]); \
|
||||||
|
} \
|
||||||
|
free(tags); \
|
||||||
|
free(paramv); \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
void insert(TAOS* taos, char **tbs, TAOS_STMT2_BIND **tags, TAOS_STMT2_BIND **paramv, const char* sql)
|
||||||
|
{
|
||||||
|
clock_t start, end;
|
||||||
|
double cpu_time_used;
|
||||||
|
|
||||||
|
TAOS_STMT2_OPTION option = {0, true, true, NULL, NULL};
|
||||||
|
TAOS_STMT2 *stmt = taos_stmt2_init(taos, &option);
|
||||||
|
int code = taos_stmt2_prepare(stmt, sql, 0);
|
||||||
|
if (code != 0) {
|
||||||
|
printf("failed to execute taos_stmt2_prepare. error:%s\n", taos_stmt2_error(stmt));
|
||||||
|
taos_stmt2_close(stmt);
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
// bind
|
||||||
|
start = clock();
|
||||||
|
TAOS_STMT2_BINDV bindv = {CTB_NUMS, tbs, tags, paramv};
|
||||||
|
if (taos_stmt2_bind_param(stmt, &bindv, -1)) {
|
||||||
|
printf("failed to execute taos_stmt2_bind_param statement.error:%s\n", taos_stmt2_error(stmt));
|
||||||
|
taos_stmt2_close(stmt);
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
end = clock();
|
||||||
|
cpu_time_used = ((double)(end - start)) / CLOCKS_PER_SEC;
|
||||||
|
printf("stmt2-bind [%s] insert Time used: %f seconds\n", sql, cpu_time_used);
|
||||||
|
start = clock();
|
||||||
|
|
||||||
|
// exec
|
||||||
|
if (taos_stmt2_exec(stmt, NULL)) {
|
||||||
|
printf("failed to execute taos_stmt2_exec statement.error:%s\n", taos_stmt2_error(stmt));
|
||||||
|
taos_stmt2_close(stmt);
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
end = clock();
|
||||||
|
cpu_time_used = ((double)(end - start)) / CLOCKS_PER_SEC;
|
||||||
|
printf("stmt2-exec [%s] insert Time used: %f seconds\n", sql, cpu_time_used);
|
||||||
|
|
||||||
|
taos_stmt2_close(stmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
void insert_dist(TAOS* taos, const char *sql) {
|
||||||
|
char **tbs, **b;
|
||||||
|
int64_t **ts;
|
||||||
|
int *ts_len, *b_len;
|
||||||
|
TAOS_STMT2_BIND **paramv, **tags;
|
||||||
|
|
||||||
|
INIT(tbs, ts, ts_len, b, b_len, tags, paramv);
|
||||||
|
|
||||||
|
insert(taos, tbs, tags, paramv, sql);
|
||||||
|
|
||||||
|
UINIT(tbs, ts, ts_len, b, b_len, tags, paramv);
|
||||||
|
}
|
||||||
|
|
||||||
|
void insert_dup(TAOS* taos, const char *sql) {
|
||||||
|
char **tbs, **b;
|
||||||
|
int64_t **ts;
|
||||||
|
int *ts_len, *b_len;
|
||||||
|
TAOS_STMT2_BIND **paramv, **tags;
|
||||||
|
|
||||||
|
INIT(tbs, ts, ts_len, b, b_len, tags, paramv);
|
||||||
|
|
||||||
|
// insert duplicate rows
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) {
|
||||||
|
for (int j = 0; j < ROW_NUMS; j++) {
|
||||||
|
ts[i][j] = 1591060628000;
|
||||||
|
b[i][j] = (char)('x' + j);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int i = 0; i < CTB_NUMS; i++) {
|
||||||
|
paramv[i][0] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_TIMESTAMP, &ts[i][0], &ts_len[0], NULL, ROW_NUMS};
|
||||||
|
paramv[i][1] = (TAOS_STMT2_BIND){TSDB_DATA_TYPE_BINARY, &b[i][0], &b_len[0], NULL, ROW_NUMS};
|
||||||
|
}
|
||||||
|
insert(taos, tbs, tags, paramv, sql);
|
||||||
|
|
||||||
|
UINIT(tbs, ts, ts_len, b, b_len, tags, paramv);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
TAOS* taos = taos_connect("localhost", "root", "taosdata", "", 0);
|
||||||
|
if (!taos) {
|
||||||
|
printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
createdb(taos);
|
||||||
|
// insert distinct rows
|
||||||
|
insert_dist(taos, "insert into db.? using db.stb tags(?,?)values(?,?)");
|
||||||
|
// insert duplicate rows
|
||||||
|
insert_dup(taos, "insert into db.? values(?,?)");
|
||||||
|
|
||||||
|
taos_close(taos);
|
||||||
|
taos_cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
|
// final results
|
||||||
|
// taos> select * from ctb_0;
|
||||||
|
// ts | b |
|
||||||
|
// =========================================
|
||||||
|
// 2020-06-02 09:17:08.000 | z |
|
||||||
|
// 2020-06-02 09:17:08.001 | b |
|
||||||
|
// 2020-06-02 09:17:08.002 | c |
|
||||||
|
// Query OK, 3 row(s) in set (0.003975s)
|
||||||
|
//
|
||||||
|
// taos> select * from ctb_1;
|
||||||
|
// ts | b |
|
||||||
|
// =========================================
|
||||||
|
// 2020-06-02 09:17:08.000 | z |
|
||||||
|
// 2020-06-02 09:17:08.001 | b |
|
||||||
|
// 2020-06-02 09:17:08.002 | c |
|
||||||
|
// Query OK, 3 row(s) in set (0.007241s)
|
||||||
|
|
||||||
|
// taos> select * from ctb_2;
|
||||||
|
// ts | b |
|
||||||
|
// =========================================
|
||||||
|
// 2020-06-02 09:17:08.000 | z |
|
||||||
|
// 2020-06-02 09:17:08.001 | b |
|
||||||
|
// 2020-06-02 09:17:08.002 | c |
|
||||||
|
// Query OK, 3 row(s) in set (0.005443s)
|
Loading…
Reference in New Issue