Merge branch '3.0' into feature/compressData

This commit is contained in:
yihaoDeng 2024-04-01 08:04:16 +00:00
commit 53ecfe511a
17 changed files with 3231 additions and 2352 deletions

View File

@ -268,7 +268,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData
bool alreadyAddGroupId(char* ctbName); bool alreadyAddGroupId(char* ctbName);
bool isAutoTableName(char* ctbName); bool isAutoTableName(char* ctbName);
void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId); void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);

View File

@ -61,7 +61,7 @@ typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue; typedef struct SStreamQueue SStreamQueue;
typedef struct SStreamTaskSM SStreamTaskSM; typedef struct SStreamTaskSM SStreamTaskSM;
#define SSTREAM_TASK_VER 3 #define SSTREAM_TASK_VER 4
#define SSTREAM_TASK_INCOMPATIBLE_VER 1 #define SSTREAM_TASK_INCOMPATIBLE_VER 1
#define SSTREAM_TASK_NEED_CONVERT_VER 2 #define SSTREAM_TASK_NEED_CONVERT_VER 2
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 #define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3

View File

@ -2188,10 +2188,14 @@ _end:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId) { void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId){
char tmp[TSDB_TABLE_NAME_LEN] = {0}; char tmp[TSDB_TABLE_NAME_LEN] = {0};
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%" PRIu64, groupId); if (stbName == NULL){
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
}else{
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%"PRIu64, stbName, groupId);
}
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put stbname + groupId to the end
strcat(ctbName, tmp); strcat(ctbName, tmp);
} }
@ -2201,6 +2205,7 @@ bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0
bool alreadyAddGroupId(char* ctbName) { bool alreadyAddGroupId(char* ctbName) {
size_t len = strlen(ctbName); size_t len = strlen(ctbName);
if (len == 0) return false;
size_t _location = len - 1; size_t _location = len - 1;
while (_location > 0) { while (_location > 0) {
if (ctbName[_location] < '0' || ctbName[_location] > '9') { if (ctbName[_location] < '0' || ctbName[_location] > '9') {

View File

@ -72,7 +72,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI32(pEncoder, innerSz) < 0) return -1; if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
for (int32_t j = 0; j < innerSz; j++) { for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGetP(pArray, j); SStreamTask *pTask = taosArrayGetP(pArray, j);
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
pTask->ver = SSTREAM_TASK_VER; pTask->ver = SSTREAM_TASK_VER;
}
if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1; if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1;
} }
} }

View File

@ -434,7 +434,9 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, NULL, 0); tEncoderInit(&encoder, NULL, 0);
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
pTask->ver = SSTREAM_TASK_VER; pTask->ver = SSTREAM_TASK_VER;
}
tEncodeStreamTask(&encoder, pTask); tEncodeStreamTask(&encoder, pTask);
int32_t size = encoder.pos; int32_t size = encoder.pos;

View File

@ -71,8 +71,8 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
if (varTbName != NULL && varTbName != (void*)-1) { if (varTbName != NULL && varTbName != (void*)-1) {
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) { if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0 && stbFullName) {
buildCtbNameAddGroupId(name, groupId); buildCtbNameAddGroupId(stbFullName, name, groupId);
} }
} else if (stbFullName) { } else if (stbFullName) {
name = buildCtbNameByGroupId(stbFullName, groupId); name = buildCtbNameByGroupId(stbFullName, groupId);
@ -182,10 +182,10 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa
int64_t gid, bool newSubTableRule) { int64_t gid, bool newSubTableRule) {
if (pDataBlock->info.parTbName[0]) { if (pDataBlock->info.parTbName[0]) {
if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) && if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) { !alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0 && stbFullName) {
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
buildCtbNameAddGroupId(pCreateTableReq->name, gid); buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid);
// tqDebug("gen name from:%s", pDataBlock->info.parTbName); // tqDebug("gen name from:%s", pDataBlock->info.parTbName);
} else { } else {
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
@ -672,10 +672,14 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
} else { } else {
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 && if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
!isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) { !alreadyAddGroupId(dstTableName) && groupId != 0) {
tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName); tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
buildCtbNameAddGroupId(dstTableName, groupId); if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
buildCtbNameAddGroupId(NULL, dstTableName, groupId);
}else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
buildCtbNameAddGroupId(stbFullName, dstTableName, groupId);
}
} }
} }

View File

@ -755,16 +755,52 @@ insert_query(A) ::= INSERT INTO full_table_name(C) query_or_subquery(B).
/************************************************ tags_literal *************************************************************/ /************************************************ tags_literal *************************************************************/
tags_literal(A) ::= NK_INTEGER(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &B, NULL); } tags_literal(A) ::= NK_INTEGER(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &B, NULL); }
tags_literal(A) ::= NK_INTEGER(B) NK_PLUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_INTEGER(B) NK_MINUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_PLUS(B) NK_INTEGER(C). { tags_literal(A) ::= NK_PLUS(B) NK_INTEGER(C). {
SToken t = B; SToken t = B;
t.n = (C.z + C.n) - B.z; t.n = (C.z + C.n) - B.z;
A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL); A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL);
} }
tags_literal(A) ::= NK_PLUS(B) NK_INTEGER NK_PLUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_PLUS(B) NK_INTEGER NK_MINUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_MINUS(B) NK_INTEGER(C). { tags_literal(A) ::= NK_MINUS(B) NK_INTEGER(C). {
SToken t = B; SToken t = B;
t.n = (C.z + C.n) - B.z; t.n = (C.z + C.n) - B.z;
A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL); A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL);
} }
tags_literal(A) ::= NK_MINUS(B) NK_INTEGER NK_PLUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_MINUS(B) NK_INTEGER NK_MINUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_FLOAT(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_DOUBLE, &B, NULL); } tags_literal(A) ::= NK_FLOAT(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_DOUBLE, &B, NULL); }
tags_literal(A) ::= NK_PLUS(B) NK_FLOAT(C). { tags_literal(A) ::= NK_PLUS(B) NK_FLOAT(C). {
SToken t = B; SToken t = B;
@ -778,29 +814,113 @@ tags_literal(A) ::= NK_MINUS(B) NK_FLOAT(C).
} }
tags_literal(A) ::= NK_BIN(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &B, NULL); } tags_literal(A) ::= NK_BIN(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &B, NULL); }
tags_literal(A) ::= NK_BIN(B) NK_PLUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_BIN(B) NK_MINUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_PLUS(B) NK_BIN(C). { tags_literal(A) ::= NK_PLUS(B) NK_BIN(C). {
SToken t = B; SToken t = B;
t.n = (C.z + C.n) - B.z; t.n = (C.z + C.n) - B.z;
A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL); A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL);
} }
tags_literal(A) ::= NK_PLUS(B) NK_BIN NK_PLUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_PLUS(B) NK_BIN NK_MINUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_MINUS(B) NK_BIN(C). { tags_literal(A) ::= NK_MINUS(B) NK_BIN(C). {
SToken t = B; SToken t = B;
t.n = (C.z + C.n) - B.z; t.n = (C.z + C.n) - B.z;
A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL); A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL);
} }
tags_literal(A) ::= NK_MINUS(B) NK_BIN NK_PLUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_MINUS(B) NK_BIN NK_MINUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_HEX(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &B, NULL); } tags_literal(A) ::= NK_HEX(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &B, NULL); }
tags_literal(A) ::= NK_HEX(B) NK_PLUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_HEX(B) NK_MINUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_PLUS(B) NK_HEX(C). { tags_literal(A) ::= NK_PLUS(B) NK_HEX(C). {
SToken t = B; SToken t = B;
t.n = (C.z + C.n) - B.z; t.n = (C.z + C.n) - B.z;
A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL); A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL);
} }
tags_literal(A) ::= NK_PLUS(B) NK_HEX NK_PLUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_PLUS(B) NK_HEX NK_MINUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_MINUS(B) NK_HEX(C). { tags_literal(A) ::= NK_MINUS(B) NK_HEX(C). {
SToken t = B; SToken t = B;
t.n = (C.z + C.n) - B.z; t.n = (C.z + C.n) - B.z;
A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL); A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL);
} }
tags_literal(A) ::= NK_MINUS(B) NK_HEX NK_PLUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_MINUS(B) NK_HEX NK_MINUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_STRING(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B, NULL); } tags_literal(A) ::= NK_STRING(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B, NULL); }
tags_literal(A) ::= NK_STRING(B) NK_PLUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_STRING(B) NK_MINUS duration_literal(C). {
SToken l = B;
SToken r = getTokenFromRawExprNode(pCxt, C);
l.n = (r.z + r.n) - l.z;
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
}
tags_literal(A) ::= NK_BOOL(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_BOOL, &B, NULL); } tags_literal(A) ::= NK_BOOL(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_BOOL, &B, NULL); }
tags_literal(A) ::= NULL(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_NULL, &B, NULL); } tags_literal(A) ::= NULL(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_NULL, &B, NULL); }

File diff suppressed because it is too large Load Diff

View File

@ -580,12 +580,15 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
} else { } else {
char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
if (pDataBlock->info.parTbName[0]) { if (pDataBlock->info.parTbName[0]) {
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && if(pTask->subtableWithoutMd5 != 1 &&
pTask->subtableWithoutMd5 != 1 &&
!isAutoTableName(pDataBlock->info.parTbName) && !isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName) &&
groupId != 0){ groupId != 0){
buildCtbNameAddGroupId(pDataBlock->info.parTbName, groupId); if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId);
}else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName, groupId);
}
} }
} else { } else {
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);

View File

@ -542,7 +542,6 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL; void* buf = NULL;
int32_t len; int32_t len;
int32_t code; int32_t code;
pTask->ver = SSTREAM_TASK_VER;
tEncodeSize(tEncodeStreamTask, pTask, len, code); tEncodeSize(tEncodeStreamTask, pTask, len, code);
if (code < 0) { if (code < 0) {
return -1; return -1;
@ -552,6 +551,9 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
return -1; return -1;
} }
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
pTask->ver = SSTREAM_TASK_VER;
}
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, len); tEncoderInit(&encoder, buf, len);
tEncodeStreamTask(&encoder, pTask); tEncodeStreamTask(&encoder, pTask);

View File

@ -936,6 +936,9 @@ sql_error alter table st_bool_i1 set tag tagname="123abc"
sql alter table st_bool_i2 set tag tagname="123" sql alter table st_bool_i2 set tag tagname="123"
sql_error alter table st_bool_i3 set tag tagname=abc sql_error alter table st_bool_i3 set tag tagname=abc
sql_error alter table st_bool_i4 set tag tagname="abc" sql_error alter table st_bool_i4 set tag tagname="abc"
sql_error alter table st_bool_i4 set tag tagname=now
sql_error alter table st_bool_i4 set tag tagname=now()+1d
sql_error alter table st_bool_i4 set tag tagname=1+1d
sql_error alter table st_bool_i5 set tag tagname=" " sql_error alter table st_bool_i5 set tag tagname=" "
sql_error alter table st_bool_i6 set tag tagname='' sql_error alter table st_bool_i6 set tag tagname=''

View File

@ -913,6 +913,8 @@ sql_error alter table st_int_e19 set tag tagname=123abc
sql_error alter table st_int_e20 set tag tagname="123abc" sql_error alter table st_int_e20 set tag tagname="123abc"
sql_error alter table st_int_e22 set tag tagname=abc sql_error alter table st_int_e22 set tag tagname=abc
sql_error alter table st_int_e23 set tag tagname="abc" sql_error alter table st_int_e23 set tag tagname="abc"
sql_error alter table st_int_e25 set tag tagname=1+1d
sql_error alter table st_int_e25 set tag tagname="1"+1d
sql_error alter table st_int_e24 set tag tagname=" " sql_error alter table st_int_e24 set tag tagname=" "
sql_error alter table st_int_e25 set tag tagname='' sql_error alter table st_int_e25 set tag tagname=''
sql alter table st_int_e26_1 set tag tagname='123' sql alter table st_int_e26_1 set tag tagname='123'

View File

@ -132,6 +132,77 @@ sql show tags from st_timestamp_22
if $data05 != -1 then if $data05 != -1 then
return -1 return -1
endi endi
sql create table st_timestamp_23 using mt_timestamp tags (1+ 1d )
sql show tags from st_timestamp_23
if $data05 != 86400001 then
return -1
endi
sql create table st_timestamp_24 using mt_timestamp tags (-0 + 1d)
sql show tags from st_timestamp_24
if $data05 != 86400000 then
return -1
endi
sql create table st_timestamp_25 using mt_timestamp tags ("-0" -1s)
sql show tags from st_timestamp_25
if $data05 != -1000 then
return -1
endi
sql create table st_timestamp_26 using mt_timestamp tags (0b01 -1a)
sql show tags from st_timestamp_26
if $data05 != 0 then
return -1
endi
sql create table st_timestamp_27 using mt_timestamp tags (0b01 -1s)
sql show tags from st_timestamp_27
if $data05 != -999 then
return -1
endi
sql create table st_timestamp_28 using mt_timestamp tags ("0x01" +1u)
sql show tags from st_timestamp_28
if $data05 != 1 then
return -1
endi
sql create table st_timestamp_29 using mt_timestamp tags (0x01 +1b)
sql show tags from st_timestamp_29
if $data05 != 1 then
return -1
endi
sql create table st_timestamp_30 using mt_timestamp tags (-0b00 -0a)
sql show tags from st_timestamp_30
if $data05 != 0 then
return -1
endi
sql create table st_timestamp_31 using mt_timestamp tags ("-0x00" +1u)
sql show tags from st_timestamp_31
if $data05 != 0 then
return -1
endi
sql create table st_timestamp_32 using mt_timestamp tags (-0x00 +1b)
sql show tags from st_timestamp_32
if $data05 != 0 then
return -1
endi
sql create table st_timestamp_33 using mt_timestamp tags (now +1b)
sql show tags from st_timestamp_33
if $data05 < 1711883186000 then
return -1
endi
sql create table st_timestamp_34 using mt_timestamp tags ("now()" +1b)
sql show tags from st_timestamp_34
if $data05 < 1711883186000 then
return -1
endi
sql create table st_timestamp_35 using mt_timestamp tags (today() +1d)
sql show tags from st_timestamp_35
if $data05 < 1711883186000 then
return -1
endi
sql create table st_timestamp_36 using mt_timestamp tags ("today()" +1d)
sql show tags from st_timestamp_36
if $data05 < 1711883186000 then
return -1
endi
## case 01: insert values for test column values ## case 01: insert values for test column values
sql insert into st_timestamp_0 values(now,NULL) sql insert into st_timestamp_0 values(now,NULL)
@ -249,6 +320,76 @@ sql select ts, cast(c as bigint) from st_timestamp_22
if $data01 != -1 then if $data01 != -1 then
return -1 return -1
endi endi
sql insert into st_timestamp_23 values(now,1+ 1d )
sql select ts, cast(c as bigint) from st_timestamp_23
if $data01 != 86400001 then
return -1
endi
sql insert into st_timestamp_24 values(now,-0 + 1d)
sql select ts, cast(c as bigint) from st_timestamp_24
if $data01 != 86400000 then
return -1
endi
sql insert into st_timestamp_25 values(now,"-0" -1s)
sql select ts, cast(c as bigint) from st_timestamp_25
if $data01 != -1000 then
return -1
endi
sql insert into st_timestamp_26 values(now,0b01 -1a)
sql select ts, cast(c as bigint) from st_timestamp_26
if $data01 != 0 then
return -1
endi
sql insert into st_timestamp_27 values(now,+0b01 -1s)
sql select ts, cast(c as bigint) from st_timestamp_27
if $data01 != -999 then
return -1
endi
sql insert into st_timestamp_28 values(now,"+0x01" +1u)
sql select ts, cast(c as bigint) from st_timestamp_28
if $data01 != 1 then
return -1
endi
sql insert into st_timestamp_29 values(now,0x01 +1b)
sql select ts, cast(c as bigint) from st_timestamp_29
if $data01 != 1 then
return -1
endi
sql insert into st_timestamp_30 values(now,-0b00 -0a)
sql show tags from st_timestamp_30
if $data05 != 0 then
return -1
endi
sql insert into st_timestamp_31 values(now,"-0x00" +1u)
sql show tags from st_timestamp_31
if $data05 != 0 then
return -1
endi
sql insert into st_timestamp_32 values (now,-0x00 +1b)
sql show tags from st_timestamp_32
if $data05 != 0 then
return -1
endi
sql insert into st_timestamp_33 values(now,now +1b)
sql select ts, cast(c as bigint) from st_timestamp_33
if $data01 < 1711883186000 then
return -1
endi
sql insert into st_timestamp_34 values(now,"now()" +1b)
sql select ts, cast(c as bigint) from st_timestamp_34
if $data01 < 1711883186000 then
return -1
endi
sql insert into st_timestamp_35 values(now,today() +1d)
sql select ts, cast(c as bigint) from st_timestamp_35
if $data01 < 1711883186000 then
return -1
endi
sql insert into st_timestamp_36 values(now,"today()" +1d)
sql select ts, cast(c as bigint) from st_timestamp_36
if $data01 < 1711883186000 then
return -1
endi
## case 02: dynamic create table for test tag values ## case 02: dynamic create table for test tag values
sql insert into st_timestamp_100 using mt_timestamp tags(NULL) values(now, NULL) sql insert into st_timestamp_100 using mt_timestamp tags(NULL) values(now, NULL)
@ -450,6 +591,136 @@ sql select ts, cast(c as bigint) from st_timestamp_1022
if $data01 != -1 then if $data01 != -1 then
return -1 return -1
endi endi
sql insert into st_timestamp_1023 using mt_timestamp tags(+1+1d) values(now,+1+ 1d )
sql show tags from st_timestamp_1023
if $data05 != 86400001 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1023
if $data01 != 86400001 then
return -1
endi
sql insert into st_timestamp_1024 using mt_timestamp tags(-0+1d) values(now,-0 + 1d)
sql show tags from st_timestamp_1024
if $data05 != 86400000 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1024
if $data01 != 86400000 then
return -1
endi
sql insert into st_timestamp_1025 using mt_timestamp tags("-0" -1s) values(now,"-0" -1s)
sql show tags from st_timestamp_1025
if $data05 != -1000 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1025
if $data01 != -1000 then
return -1
endi
sql insert into st_timestamp_1026 using mt_timestamp tags(+0b01-1a) values(now,+0b01 -1a)
sql show tags from st_timestamp_1026
if $data05 != 0 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1026
if $data01 != 0 then
return -1
endi
sql insert into st_timestamp_1027 using mt_timestamp tags(0b01-1s) values(now,0b01 -1s)
sql show tags from st_timestamp_1027
if $data05 != -999 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1027
if $data01 != -999 then
return -1
endi
sql insert into st_timestamp_1028 using mt_timestamp tags("0x01" + 1u) values(now,"0x01" +1u)
sql show tags from st_timestamp_1028
if $data05 != 1 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1028
if $data01 != 1 then
return -1
endi
sql insert into st_timestamp_1029 using mt_timestamp tags(+0x01 +1b) values(now,+0x01 +1b)
sql show tags from st_timestamp_1029
if $data05 != 1 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1029
if $data01 != 1 then
return -1
endi
sql insert into st_timestamp_1030 using mt_timestamp tags (-0b00 -0a) values(now,-0b00 -0a)
sql show tags from st_timestamp_1030
if $data05 != 0 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1030
if $data01 != 0 then
return -1
endi
sql insert into st_timestamp_1031 using mt_timestamp tags ("-0x00" +1u) values(now,"-0x00" +1u)
sql show tags from st_timestamp_1031
if $data05 != 0 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1031
if $data01 != 0 then
return -1
endi
sql insert into st_timestamp_1032 using mt_timestamp tags (-0x00 +1b) values(now,-0x00 +1b)
sql show tags from st_timestamp_1032
if $data05 != 0 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1032
if $data01 != 0 then
return -1
endi
sql insert into st_timestamp_1033 using mt_timestamp tags(now+1b) values(now,now +1b)
sql show tags from st_timestamp_1033
if $data05 < 1711883186000 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1033
if $data01 < 1711883186000 then
return -1
endi
sql insert into st_timestamp_1034 using mt_timestamp tags("now" +1b) values(now,"now()" +1b)
sql show tags from st_timestamp_1034
if $data05 < 1711883186000 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1034
if $data01 < 1711883186000 then
return -1
endi
sql insert into st_timestamp_1035 using mt_timestamp tags(today() + 1d) values(now,today() +1d)
sql show tags from st_timestamp_1035
if $data05 < 1711883186000 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1035
if $data01 < 1711883186000 then
return -1
endi
sql insert into st_timestamp_1036 using mt_timestamp tags("today" +1d) values(now,"today()" +1d)
sql show tags from st_timestamp_1036
if $data05 < 1711883186000 then
return -1
endi
sql select ts, cast(c as bigint) from st_timestamp_1036
if $data01 < 1711883186000 then
return -1
endi
### case 03: alter tag values ### case 03: alter tag values
sql alter table st_timestamp_0 set tag tagname=NULL sql alter table st_timestamp_0 set tag tagname=NULL
@ -567,12 +838,85 @@ sql show tags from st_timestamp_22
if $data05 != -1 then if $data05 != -1 then
return -1 return -1
endi endi
sql alter table st_timestamp_23 set tag tagname=1+ 1d
sql show tags from st_timestamp_23
if $data05 != 86400001 then
return -1
endi
sql alter table st_timestamp_24 set tag tagname=-0 + 1d
sql show tags from st_timestamp_24
if $data05 != 86400000 then
return -1
endi
sql alter table st_timestamp_25 set tag tagname="-0" -1s
sql show tags from st_timestamp_25
if $data05 != -1000 then
return -1
endi
sql alter table st_timestamp_26 set tag tagname=+0b01 -1a
sql show tags from st_timestamp_26
if $data05 != 0 then
return -1
endi
sql alter table st_timestamp_27 set tag tagname=0b01 -1s
sql show tags from st_timestamp_27
if $data05 != -999 then
return -1
endi
sql alter table st_timestamp_28 set tag tagname="0x01" +1u
sql show tags from st_timestamp_28
if $data05 != 1 then
return -1
endi
sql alter table st_timestamp_29 set tag tagname=0x01 +1b
sql show tags from st_timestamp_29
if $data05 != 1 then
return -1
endi
sql alter table st_timestamp_30 set tag tagname==-0b00 -0a
sql show tags from st_timestamp_30
if $data05 != 0 then
return -1
endi
sql alter table st_timestamp_31 set tag tagname="-0x00" +1u
sql show tags from st_timestamp_31
if $data05 != 0 then
return -1
endi
sql alter table st_timestamp_32 set tag tagname=-0x00 +1b
sql show tags from st_timestamp_32
if $data05 != 0 then
return -1
endi
sql alter table st_timestamp_33 set tag tagname=now +1b
sql show tags from st_timestamp_33
if $data05 < 1711883186000 then
return -1
endi
sql alter table st_timestamp_34 set tag tagname="now()" +1b
sql show tags from st_timestamp_34
if $data05 < 1711883186000 then
return -1
endi
sql alter table st_timestamp_35 set tag tagname=today( ) +1d
sql show tags from st_timestamp_35
if $data05 < 1711883186000 then
return -1
endi
sql alter table st_timestamp_36 set tag tagname="today()" +1d
sql show tags from st_timestamp_36
if $data05 < 1711883186000 then
return -1
endi
## case 04: illegal input ## case 04: illegal input
sql_error create table st_timestamp_e0 using mt_timestamp tags (123abc) sql_error create table st_timestamp_e0 using mt_timestamp tags (123abc)
sql_error create table st_timestamp_e0 using mt_timestamp tags ("123abc") sql_error create table st_timestamp_e0 using mt_timestamp tags ("123abc")
sql_error create table st_timestamp_e0 using mt_timestamp tags (abc) sql_error create table st_timestamp_e0 using mt_timestamp tags (abc)
sql_error create table st_timestamp_e0 using mt_timestamp tags ("abc") sql_error create table st_timestamp_e0 using mt_timestamp tags ("abc")
sql_error create table st_timestamp_e0 using mt_timestamp tags (now()+1d+1s)
sql_error create table st_timestamp_e0 using mt_timestamp tags (1+1y)
sql_error create table st_timestamp_e0 using mt_timestamp tags (0x01+1b+1a)
sql_error create table st_timestamp_e0 using mt_timestamp tags (" ") sql_error create table st_timestamp_e0 using mt_timestamp tags (" ")
sql_error create table st_timestamp_e0 using mt_timestamp tags ('') sql_error create table st_timestamp_e0 using mt_timestamp tags ('')
sql_error create table st_timestamp_104 using mt_timestamp tags ("-123.1") sql_error create table st_timestamp_104 using mt_timestamp tags ("-123.1")
@ -590,5 +934,7 @@ sql_error create table st_timestamp_115 using mt_timestamp tags (922337203685477
sql create table st_timestamp_116 using mt_timestamp tags (-9223372036854775808) sql create table st_timestamp_116 using mt_timestamp tags (-9223372036854775808)
sql_error create table st_timestamp_117 using mt_timestamp tags (-9223372036854775809) sql_error create table st_timestamp_117 using mt_timestamp tags (-9223372036854775809)
sql_error insert into st_timestamp_118 using mt_timestamp tags(9223372036854775807) values(9223372036854775807, 9223372036854775807) sql_error insert into st_timestamp_118 using mt_timestamp tags(9223372036854775807) values(9223372036854775807, 9223372036854775807)
sql_error insert into st_timestamp_119 using mt_timestamp tags(1+1s-1s) values(now, now)
sql_error insert into st_timestamp_120 using mt_timestamp tags(1-1s) values(now, now-1s+1d)
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -299,6 +299,8 @@ sql_error create table st_varbinary_1012 using mt_varbinary tags(tRue)
sql_error create table st_varbinary_1013 using mt_varbinary tags(FalsE) sql_error create table st_varbinary_1013 using mt_varbinary tags(FalsE)
sql_error create table st_varbinary_1014 using mt_varbinary tags(noW) sql_error create table st_varbinary_1014 using mt_varbinary tags(noW)
sql_error create table st_varbinary_1015 using mt_varbinary tags(toDay) sql_error create table st_varbinary_1015 using mt_varbinary tags(toDay)
sql_error create table st_varbinary_1016 using mt_varbinary tags(now()+1s)
sql_error create table st_varbinary_1017 using mt_varbinary tags(1+1s)
sql_error insert into st_varbinary_106 using mt_varbinary tags(+0123) values(now, NULL); sql_error insert into st_varbinary_106 using mt_varbinary tags(+0123) values(now, NULL);
sql_error insert into st_varbinary_107 using mt_varbinary tags(-01.23) values(now, NULL); sql_error insert into st_varbinary_107 using mt_varbinary tags(-01.23) values(now, NULL);
sql_error insert into st_varbinary_108 using mt_varbinary tags(+0x01) values(now, NULL); sql_error insert into st_varbinary_108 using mt_varbinary tags(+0x01) values(now, NULL);
@ -309,6 +311,8 @@ sql_error insert into st_varbinary_1012 using mt_varbinary tags(tRue) values(no
sql_error insert into st_varbinary_1013 using mt_varbinary tags(FalsE) values(now, NULL); sql_error insert into st_varbinary_1013 using mt_varbinary tags(FalsE) values(now, NULL);
sql_error insert into st_varbinary_1014 using mt_varbinary tags(noW) values(now, NULL); sql_error insert into st_varbinary_1014 using mt_varbinary tags(noW) values(now, NULL);
sql_error insert into st_varbinary_1015 using mt_varbinary tags(toDay) values(now, NULL); sql_error insert into st_varbinary_1015 using mt_varbinary tags(toDay) values(now, NULL);
sql_error insert into st_varbinary_1016 using mt_varbinary tags(now()+1s) values(now, NULL);
sql_error insert into st_varbinary_1017 using mt_varbinary tags(1+1s) values(now, NULL);
sql_error insert into st_varbinary_106 using mt_varbinary tags(NULL) values(now(), +0123) sql_error insert into st_varbinary_106 using mt_varbinary tags(NULL) values(now(), +0123)
sql_error insert into st_varbinary_107 using mt_varbinary tags(NULL) values(now(), -01.23) sql_error insert into st_varbinary_107 using mt_varbinary tags(NULL) values(now(), -01.23)
sql_error insert into st_varbinary_108 using mt_varbinary tags(NULL) values(now(), +0x01) sql_error insert into st_varbinary_108 using mt_varbinary tags(NULL) values(now(), +0x01)
@ -319,5 +323,7 @@ sql_error insert into st_varbinary_1012 using mt_varbinary tags(NULL) values(no
sql_error insert into st_varbinary_1013 using mt_varbinary tags(NULL) values(now(), FalsE) sql_error insert into st_varbinary_1013 using mt_varbinary tags(NULL) values(now(), FalsE)
sql_error insert into st_varbinary_1014 using mt_varbinary tags(NULL) values(now(), noW) sql_error insert into st_varbinary_1014 using mt_varbinary tags(NULL) values(now(), noW)
sql_error insert into st_varbinary_1015 using mt_varbinary tags(NULL) values(now(), toDay) sql_error insert into st_varbinary_1015 using mt_varbinary tags(NULL) values(now(), toDay)
sql_error insert into st_varbinary_1016 using mt_varbinary tags(NULL) values(now(), now()+1s)
sql_error insert into st_varbinary_1017 using mt_varbinary tags(NULL) values(now(), 1+1s)
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -410,6 +410,17 @@ endi
# case 04: illegal input # case 04: illegal input
sql_error create table st_varchar_100 using mt_varchar tags(now+1d)
sql_error create table st_varchar_101 using mt_varchar tags(toDay+1d)
sql_error create table st_varchar_102 using mt_varchar tags(1+1b)
sql_error create table st_varchar_103 using mt_varchar tags(0x01+1d)
sql_error create table st_varchar_104 using mt_varchar tags(0b01+1s)
sql_error insert into st_varchar_1100 using mt_varchar tags('now') values(now(),now+1d)
sql_error insert into st_varchar_1101 using mt_varchar tags('now') values(now(),toDay+1d)
sql_error insert into st_varchar_1102 using mt_varchar tags('now') values(now(),1+1b)
sql_error insert into st_varchar_1103 using mt_varchar tags('now') values(now(),0x01+1d)
sql_error insert into st_varchar_1104 using mt_varchar tags('now') values(now(),0b01+1s)
sql_error alter table st_varchar_15 set tag tagname=now()+1d
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -78,14 +78,36 @@ class TDTestCase:
tdLog.info(cmd) tdLog.info(cmd)
os.system(cmd) os.system(cmd)
def case1(self):
tdSql.execute(f'create database if not exists d1 vgroups 1')
tdSql.execute(f'use d1')
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)')
tdSql.execute("create stream stream1 fill_history 1 into sta subtable(concat('new-', tname)) AS SELECT "
"_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True)
tdSql.execute("create stream stream2 fill_history 1 into stb subtable(concat('new-', tname)) AS SELECT "
"_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True)
time.sleep(2)
tdSql.query("select * from sta")
tdSql.checkRows(3)
tdSql.query("select * from stb")
tdSql.checkRows(3)
# run # run
def run(self): def run(self):
self.case1()
# gen data # gen data
random.seed(int(time.time())) random.seed(int(time.time()))
self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y") self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y")
# create stream # create stream
tdSql.execute("use db") tdSql.execute("use db")
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) tdSql.execute("create stream stream3 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
sql = "select count(*) from sta" sql = "select count(*) from sta"
# loop wait max 60s to check count is ok # loop wait max 60s to check count is ok
tdLog.info("loop wait result ...") tdLog.info("loop wait result ...")

View File

@ -0,0 +1,243 @@
# 写一段python代码生成一个JSON串json 串为数组数组长度为10000每个元素为包含4000个key-value对的JSON字符串json 数组里每个元素里的4000个key不相同元素之间使用相同的keykey值为英文单词value 为int值且value 的范围是[0, 256]。把json串紧凑形式写入文件把json串存入parquet文件中把json串写入avro文件中,把json串写入到postgre sql表中表有两列第一列主int类型主键第二列为json类型数组的每个元素写入json类型里
import csv
import json
import os
import random
import string
import time
from faker import Faker
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import fastavro
import psycopg2
from psycopg2.extras import Json
def get_dir_size(start_path='.'):
total = 0
for dirpath, dirs, files in os.walk(start_path):
for f in files:
fp = os.path.join(dirpath, f)
# 获取文件大小并累加到total上
total += os.path.getsize(fp)
return total
def to_avro_record(obj):
return {key: value for key, value in obj.items()}
def generate_random_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def generate_random_values(t):
if t == 0:
return random.randint(-255, 256)
elif t == 1:
return random.randint(-2100000000, 2100000000)
elif t == 2:
return random.uniform(-10000.0, 10000.0)
elif t == 3:
return generate_random_string(10)
elif t == 4:
return random.choice([True, False])
def generate_json_object(key_set, value_set):
values = [generate_random_values(t) for t in value_set]
return dict(zip(key_set, values))
def generate_json_array(keys, values, array_length):
return [generate_json_object(keys, values) for _ in range(array_length)]
def write_parquet_file(parquet_file, json_array):
df = pd.DataFrame(json_array)
table = pa.Table.from_pandas(df)
pq.write_table(table, parquet_file + ".parquet")
def write_json_file(json_file, json_array):
with open(json_file + ".json", 'w') as f:
json.dump(json_array, f, separators=(',', ':'))
def generate_avro_schema(k, t):
if t == 0:
return {"name": k, "type": "int", "logicalType": "int"}
elif t == 1:
return {"name": k, "type": "int", "logicalType": "int"}
elif t == 2:
return {"name": k, "type": "float"}
elif t == 3:
return {"name": k, "type": "string"}
elif t == 4:
return {"name": k, "type": "boolean"}
def write_avro_file(avro_file, json_array, keys, values):
k = list(json_array[0].keys())
if keys != k:
raise ValueError("keys and values should have the same length")
avro_schema = {
"type": "record",
"name": "MyRecord",
"fields": [generate_avro_schema(k, v) for k, v in dict(zip(keys, values)).items()]
}
avro_records = [to_avro_record(obj) for obj in json_array]
with open(avro_file + ".avro", 'wb') as f:
fastavro.writer(f, avro_schema, avro_records)
def write_pg_file(json_array):
conn_str = "dbname=mydatabase user=myuser host=localhost"
conn = psycopg2.connect(conn_str)
cur = conn.cursor()
cur.execute("drop table if exists my_table")
conn.commit()
# 创建表(如果不存在)
cur.execute("""
CREATE TABLE IF NOT EXISTS my_table (
id SERIAL PRIMARY KEY,
json_data JSONB
);
""")
conn.commit()
# 执行SQL查询
cur.execute("SELECT count(*) FROM my_table")
# 获取查询结果
rows = cur.fetchall()
# 打印查询结果
for row in rows:
print("rows before:", row[0])
# 插入数据
for idx, json_obj in enumerate(json_array):
# print(json.dumps(json_obj))
cur.execute("INSERT INTO my_table (json_data) VALUES (%s)", (json.dumps(json_obj),))
conn.commit() # 提交事务
# 执行SQL查询
cur.execute("SELECT count(*) FROM my_table")
# 获取查询结果
rows = cur.fetchall()
# 打印查询结果
for row in rows:
print("rows after:", row[0])
# # 执行SQL查询
# cur.execute("SELECT pg_relation_size('my_table')")
# # 获取查询结果
# rows = cur.fetchall()
# # 打印查询结果
# size = 0
# for row in rows:
# size = row[0]
# print("table size:", row[0])
# 关闭游标和连接
cur.close()
conn.close()
def read_parquet_file(parquet_file):
table = pq.read_table(parquet_file + ".parquet")
df = table.to_pandas()
print(df)
def read_avro_file(avg_file):
with open(avg_file + ".avro", 'rb') as f:
reader = fastavro.reader(f)
for record in reader:
print(record)
def read_json_file(csv_file):
with open(csv_file + ".json", 'r') as f:
data = json.load(f)
print(data)
def main():
key_length = 7
key_sizes = 4000
row_sizes = 10000
file_name = "output"
# cases = [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (0, 4)]
cases = [(2, 2), (3, 3), (0, 4)]
for data in cases:
begin, end = data
print(f"执行类型:{begin}-{end}")
N = 2
for _ in range(N):
t0 = time.time()
keys = [generate_random_string(key_length) for _ in range(key_sizes)]
values = [random.randint(begin, end) for _ in range(key_sizes)]
# 生成JSON数组
json_array = generate_json_array(keys, values, row_sizes)
t1 = time.time()
write_json_file(file_name, json_array)
t2 = time.time()
write_parquet_file(file_name, json_array)
t3 = time.time()
write_avro_file(file_name, json_array, keys, values)
t4 = time.time()
size = write_pg_file(json_array)
t5 = time.time()
print("生成json 速度:", t2 - t0, "文件大小:", os.path.getsize(file_name + ".json"))
print("parquet 速度:", t3 - t2, "文件大小:", os.path.getsize(file_name + ".parquet"))
print("avro 速度:", t4 - t3, "文件大小:", os.path.getsize(file_name + ".avro"))
print("pg json 速度:", t5 - t4, "文件大小:", get_dir_size("/opt/homebrew/var/postgresql@14/base/16385") - 8 * 1024 * 1024)
# read_json_file(file_name)
# read_parquet_file(file_name)
# read_avro_file(file_name)
print(f"\n---------------\n")
if __name__ == "__main__":
main()
# 压缩文件
# import os
#
# import lz4.frame
#
#
# files =["output.json", "output.parquet", "output.avro"]
# def compress_file(input_path, output_path):
# with open(input_path, 'rb') as f_in:
# compressed_data = lz4.frame.compress(f_in.read())
#
# with open(output_path, 'wb') as f_out:
# f_out.write(compressed_data)
#
# for file in files:
# compress_file(file, file + ".lz4")
# print(file, "origin size:", os.path.getsize(file), " after lsz size:", os.path.getsize(file + ".lz4"))