fix(util): fix error

This commit is contained in:
Haojun Liao 2024-07-28 15:46:19 +08:00
parent 75c121c18a
commit 848b4aab4f
8 changed files with 135 additions and 62 deletions

View File

@ -2326,13 +2326,13 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) { int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) {
int32_t size = 2048 * 1024; int32_t size = 2048 * 1024;
int32_t code = 0; int32_t code = 0;
char* dumpBuf = *pDataBuf; char* dumpBuf = NULL;
char pBuf[128] = {0}; char pBuf[128] = {0};
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
int32_t len = 0; int32_t len = 0;
*pDataBuf = taosMemoryCalloc(size, 1); dumpBuf = taosMemoryCalloc(size, 1);
if (*pDataBuf == NULL) { if (dumpBuf == NULL) {
return terrno; return terrno;
} }
@ -2442,6 +2442,8 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf
if (len >= size - 1) return code; if (len >= size - 1) return code;
} }
len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag); len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
*pDataBuf = dumpBuf;
return code; return code;
} }
@ -2698,7 +2700,10 @@ int32_t buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId, char**
int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf); int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
} else {
*pName = pBuf;
} }
return code; return code;
} }

View File

@ -298,6 +298,7 @@ int32_t buildChildTableName(RandTableName* rName) {
if (sb.buf == NULL) { if (sb.buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosArraySort(rName->tags, compareKv); taosArraySort(rName->tags, compareKv);
for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) { for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) {
taosStringBuilderAppendChar(&sb, ','); taosStringBuilderAppendChar(&sb, ',');
@ -305,6 +306,7 @@ int32_t buildChildTableName(RandTableName* rName) {
if (tagKv == NULL) { if (tagKv == NULL) {
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen); taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
taosStringBuilderAppendChar(&sb, '='); taosStringBuilderAppendChar(&sb, '=');
if (IS_VAR_DATA_TYPE(tagKv->type)) { if (IS_VAR_DATA_TYPE(tagKv->type)) {
@ -313,6 +315,7 @@ int32_t buildChildTableName(RandTableName* rName) {
taosStringBuilderAppendStringLen(&sb, (char*)(&(tagKv->value)), tagKv->length); taosStringBuilderAppendStringLen(&sb, (char*)(&(tagKv->value)), tagKv->length);
} }
} }
size_t len = 0; size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len); char* keyJoined = taosStringBuilderGetResult(&sb, &len);
T_MD5_CTX context; T_MD5_CTX context;

View File

@ -192,7 +192,8 @@ TEST_F(StreamTest, kill_checkpoint_trans) {
opt.pWal = pMnode->pWal; opt.pWal = pMnode->pWal;
pMnode->pSdb = sdbInit(&opt); pMnode->pSdb = sdbInit(&opt);
taosThreadMutexInit(&pMnode->syncMgmt.lock, NULL); int32_t code = taosThreadMutexInit(&pMnode->syncMgmt.lock, NULL);
ASSERT(code == 0);
} }
SVgroupChangeInfo info; SVgroupChangeInfo info;
@ -248,7 +249,7 @@ TEST_F(StreamTest, plan_Test) {
if (taosCreateLog("taoslog", 10, "/etc/taos", NULL, NULL, NULL, NULL, 1) != 0) { if (taosCreateLog("taoslog", 10, "/etc/taos", NULL, NULL, NULL, NULL, 1) != 0) {
// ignore create log failed, only print // ignore create log failed, only print
printf(" WARING: Create failed:%s. configDir\n", strerror(errno)); (void) printf(" WARING: Create failed:%s. configDir\n", strerror(errno));
} }
if (nodesStringToNode(ast, &pAst) < 0) { if (nodesStringToNode(ast, &pAst) < 0) {

View File

@ -76,8 +76,11 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) {
ASSERT(code == 0); ASSERT(code == 0);
SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1);
blockDataAppendColInfo(pInfo->pBlock, &colInfo); code = blockDataAppendColInfo(pInfo->pBlock, &colInfo);
blockDataEnsureCapacity(pInfo->pBlock, pInfo->numOfRowsPerPage); ASSERT(code == 0);
code = blockDataEnsureCapacity(pInfo->pBlock, pInfo->numOfRowsPerPage);
ASSERT(code == 0);
// SColumnInfoData colInfo1 = {0}; // SColumnInfoData colInfo1 = {0};
// colInfo1.info.type = TSDB_DATA_TYPE_BINARY; // colInfo1.info.type = TSDB_DATA_TYPE_BINARY;
@ -109,7 +112,8 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) {
v = taosRand(); v = taosRand();
} }
colDataSetVal(pColInfo, i, reinterpret_cast<const char*>(&v), false); int32_t code = colDataSetVal(pColInfo, i, reinterpret_cast<const char*>(&v), false);
ASSERT(code == 0);
// sprintf(buf, "this is %d row", i); // sprintf(buf, "this is %d row", i);
// STR_TO_VARSTR(b1, buf); // STR_TO_VARSTR(b1, buf);
@ -137,12 +141,15 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
ASSERT(code == 0); ASSERT(code == 0);
SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1);
blockDataAppendColInfo(pInfo->pBlock, &colInfo); int32_t code = blockDataAppendColInfo(pInfo->pBlock, &colInfo);
ASSERT(code == 0);
SColumnInfoData colInfo1 = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2); SColumnInfoData colInfo1 = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
blockDataAppendColInfo(pInfo->pBlock, &colInfo1); code = blockDataAppendColInfo(pInfo->pBlock, &colInfo1);
ASSERT(code == 0);
blockDataEnsureCapacity(pInfo->pBlock, pInfo->numOfRowsPerPage); code = blockDataEnsureCapacity(pInfo->pBlock, pInfo->numOfRowsPerPage);
ASSERT(code == 0);
} else { } else {
blockDataCleanup(pInfo->pBlock); blockDataCleanup(pInfo->pBlock);
} }
@ -157,7 +164,8 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0));
ts = (++pInfo->tsStart); ts = (++pInfo->tsStart);
colDataSetVal(pColInfo, i, reinterpret_cast<const char*>(&ts), false); int32_t code = colDataSetVal(pColInfo, i, reinterpret_cast<const char*>(&ts), false);
ASSERT(code == 0);
SColumnInfoData* pColInfo1 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1)); SColumnInfoData* pColInfo1 = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 1));
if (pInfo->type == data_desc) { if (pInfo->type == data_desc) {
@ -168,7 +176,8 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
v = taosRand(); v = taosRand();
} }
colDataSetVal(pColInfo1, i, reinterpret_cast<const char*>(&v), false); code = colDataSetVal(pColInfo1, i, reinterpret_cast<const char*>(&v), false);
ASSERT(code == 0);
// sprintf(buf, "this is %d row", i); // sprintf(buf, "this is %d row", i);
// STR_TO_VARSTR(b1, buf); // STR_TO_VARSTR(b1, buf);
@ -182,7 +191,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
pInfo->current += 1; pInfo->current += 1;
pBlock->info.dataLoad = 1; pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0); int32_t code = blockDataUpdateTsWindow(pBlock, 0);
return pBlock; return pBlock;
} }

View File

@ -42,9 +42,6 @@ TEST(testCase, linear_hash_Tests) {
int64_t et = taosGetTimestampUs(); int64_t et = taosGetTimestampUs();
for (int32_t i = 0; i < 1000000; ++i) { for (int32_t i = 0; i < 1000000; ++i) {
if (i == 950000) {
printf("kf\n");
}
char* v = tHashGet(pHashObj, &i, sizeof(i)); char* v = tHashGet(pHashObj, &i, sizeof(i));
if (v != NULL) { if (v != NULL) {
// printf("find value: %d, key:%d\n", *(int32_t*) v, i); // printf("find value: %d, key:%d\n", *(int32_t*) v, i);
@ -54,12 +51,16 @@ TEST(testCase, linear_hash_Tests) {
} }
// tHashPrint(pHashObj, LINEAR_HASH_STATIS); // tHashPrint(pHashObj, LINEAR_HASH_STATIS);
tHashCleanup(pHashObj); int32_t code = tHashCleanup(pHashObj);
ASSERT(code == 0);
int64_t et1 = taosGetTimestampUs(); int64_t et1 = taosGetTimestampUs();
SHashObj* pHashObj1 = taosHashInit(1000, fn, false, HASH_NO_LOCK); SHashObj* pHashObj1 = taosHashInit(1000, fn, false, HASH_NO_LOCK);
ASSERT(pHashObj1 != NULL);
for (int32_t i = 0; i < 1000000; ++i) { for (int32_t i = 0; i < 1000000; ++i) {
taosHashPut(pHashObj1, &i, sizeof(i), &i, sizeof(i)); int32_t code = taosHashPut(pHashObj1, &i, sizeof(i), &i, sizeof(i));
ASSERT(code == 0);
} }
for (int32_t i = 0; i < 1000000; ++i) { for (int32_t i = 0; i < 1000000; ++i) {
@ -68,6 +69,6 @@ TEST(testCase, linear_hash_Tests) {
taosHashCleanup(pHashObj1); taosHashCleanup(pHashObj1);
int64_t et2 = taosGetTimestampUs(); int64_t et2 = taosGetTimestampUs();
printf("linear hash time:%.2f ms, buildHash:%.2f ms, hash:%.2f\n", (et1 - st) / 1000.0, (et - st) / 1000.0, (void)printf("linear hash time:%.2f ms, buildHash:%.2f ms, hash:%.2f\n", (et1 - st) / 1000.0, (et - st) / 1000.0,
(et2 - et1) / 1000.0); (et2 - et1) / 1000.0);
} }

View File

@ -78,8 +78,11 @@ SSDataBlock* getSingleColDummyBlock(void* param) {
} }
colInfo.info.colId = 1; colInfo.info.colId = 1;
blockDataAppendColInfo(pBlock, &colInfo); int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
blockDataEnsureCapacity(pBlock, pInfo->pageRows); ASSERT(code == 0);
code = blockDataEnsureCapacity(pBlock, pInfo->pageRows);
ASSERT(code == 0);
for (int32_t i = 0; i < pInfo->pageRows; ++i) { for (int32_t i = 0; i < pInfo->pageRows; ++i) {
SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0));
@ -92,25 +95,31 @@ SSDataBlock* getSingleColDummyBlock(void* param) {
int32_t len = 0; int32_t len = 0;
bool ret = taosMbsToUcs4(strOri, size, (TdUcs4*)varDataVal(str), size * TSDB_NCHAR_SIZE, &len); bool ret = taosMbsToUcs4(strOri, size, (TdUcs4*)varDataVal(str), size * TSDB_NCHAR_SIZE, &len);
if (!ret) { if (!ret) {
printf("error\n"); (void) printf("error\n");
return NULL; return NULL;
} }
varDataSetLen(str, len); varDataSetLen(str, len);
colDataSetVal(pColInfo, i, reinterpret_cast<const char*>(str), false); int32_t code = colDataSetVal(pColInfo, i, reinterpret_cast<const char*>(str), false);
ASSERT(code == 0);
pBlock->info.hasVarCol = true; pBlock->info.hasVarCol = true;
printf("nchar: %s\n", strOri); (void) printf("nchar: %s\n", strOri);
} else if (pInfo->type == TSDB_DATA_TYPE_BINARY || pInfo->type == TSDB_DATA_TYPE_GEOMETRY) { } else if (pInfo->type == TSDB_DATA_TYPE_BINARY || pInfo->type == TSDB_DATA_TYPE_GEOMETRY) {
int32_t size = taosRand() % VARCOUNT; int32_t size = taosRand() % VARCOUNT;
char str[64] = {0}; char str[64] = {0};
taosRandStr(varDataVal(str), size); taosRandStr(varDataVal(str), size);
varDataSetLen(str, size); varDataSetLen(str, size);
colDataSetVal(pColInfo, i, reinterpret_cast<const char*>(str), false); code = colDataSetVal(pColInfo, i, reinterpret_cast<const char*>(str), false);
ASSERT(code == 0);
pBlock->info.hasVarCol = true; pBlock->info.hasVarCol = true;
printf("binary: %s\n", varDataVal(str)); (void) printf("binary: %s\n", varDataVal(str));
} else if (pInfo->type == TSDB_DATA_TYPE_DOUBLE || pInfo->type == TSDB_DATA_TYPE_FLOAT) { } else if (pInfo->type == TSDB_DATA_TYPE_DOUBLE || pInfo->type == TSDB_DATA_TYPE_FLOAT) {
double v = rand_f2(); double v = rand_f2();
colDataSetVal(pColInfo, i, reinterpret_cast<const char*>(&v), false); code = colDataSetVal(pColInfo, i, reinterpret_cast<const char*>(&v), false);
printf("float: %f\n", v); ASSERT(code == 0);
(void) printf("float: %f\n", v);
} else { } else {
int64_t v = ++pInfo->startVal; int64_t v = ++pInfo->startVal;
char* result = static_cast<char*>(taosMemoryCalloc(tDataTypes[pInfo->type].bytes, 1)); char* result = static_cast<char*>(taosMemoryCalloc(tDataTypes[pInfo->type].bytes, 1));
@ -120,8 +129,10 @@ SSDataBlock* getSingleColDummyBlock(void* param) {
memcpy(result, (char*)(&v) + sizeof(int64_t) - tDataTypes[pInfo->type].bytes, tDataTypes[pInfo->type].bytes); memcpy(result, (char*)(&v) + sizeof(int64_t) - tDataTypes[pInfo->type].bytes, tDataTypes[pInfo->type].bytes);
} }
colDataSetVal(pColInfo, i, result, false); code = colDataSetVal(pColInfo, i, result, false);
printf("int: %" PRId64 "\n", v); ASSERT(code == 0);
(void) printf("int: %" PRId64 "\n", v);
taosMemoryFree(result); taosMemoryFree(result);
} }
} }
@ -359,7 +370,8 @@ TEST(testCase, ordered_merge_sort_Test) {
for (int32_t i = 0; i < 1; ++i) { for (int32_t i = 0; i < 1; ++i) {
SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1);
blockDataAppendColInfo(pBlock, &colInfo); code = blockDataAppendColInfo(pBlock, &colInfo);
ASSERT(code == 0);
} }
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_MULTISOURCE_MERGE, 1024, 5, pBlock,"test_abc"); SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_MULTISOURCE_MERGE, 1024, 5, pBlock,"test_abc");

View File

@ -43,7 +43,9 @@ TEST(testCase, tSimpleHashTest_intKey) {
int64_t originKeySum = 0; int64_t originKeySum = 0;
for (int64_t i = 1; i <= 100; ++i) { for (int64_t i = 1; i <= 100; ++i) {
originKeySum += i; originKeySum += i;
tSimpleHashPut(pHashObj, (const void *)&i, keyLen, (const void *)&i, dataLen); code = tSimpleHashPut(pHashObj, (const void *)&i, keyLen, (const void *)&i, dataLen);
ASSERT(code == 0);
ASSERT_EQ(i, tSimpleHashGetSize(pHashObj)); ASSERT_EQ(i, tSimpleHashGetSize(pHashObj));
} }
@ -68,7 +70,9 @@ TEST(testCase, tSimpleHashTest_intKey) {
ASSERT_EQ(keySum, originKeySum); ASSERT_EQ(keySum, originKeySum);
for (int64_t i = 1; i <= 100; ++i) { for (int64_t i = 1; i <= 100; ++i) {
tSimpleHashRemove(pHashObj, (const void *)&i, keyLen); code = tSimpleHashRemove(pHashObj, (const void *)&i, keyLen);
ASSERT(code == 0);
ASSERT_EQ(100 - i, tSimpleHashGetSize(pHashObj)); ASSERT_EQ(100 - i, tSimpleHashGetSize(pHashObj));
} }
@ -95,7 +99,9 @@ TEST(testCase, tSimpleHashTest_binaryKey) {
for (int64_t i = 1; i <= 100; ++i) { for (int64_t i = 1; i <= 100; ++i) {
combineKey.suid = i; combineKey.suid = i;
combineKey.uid = i + 1; combineKey.uid = i + 1;
tSimpleHashPut(pHashObj, (const void *)&combineKey, keyLen, (const void *)&i, dataLen); code = tSimpleHashPut(pHashObj, (const void *)&combineKey, keyLen, (const void *)&i, dataLen);
ASSERT(code == 0);
originDataSum += i; originDataSum += i;
ASSERT_EQ(i, tSimpleHashGetSize(pHashObj)); ASSERT_EQ(i, tSimpleHashGetSize(pHashObj));
} }
@ -120,7 +126,8 @@ TEST(testCase, tSimpleHashTest_binaryKey) {
ASSERT_EQ(originDataSum, dataSum); ASSERT_EQ(originDataSum, dataSum);
tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen); code = tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen);
ASSERT(code == 0);
while ((data = tSimpleHashIterate(pHashObj, data, &iter))) { while ((data = tSimpleHashIterate(pHashObj, data, &iter))) {
void *key = tSimpleHashGetKey(data, &kLen); void *key = tSimpleHashGetKey(data, &kLen);
@ -130,7 +137,9 @@ TEST(testCase, tSimpleHashTest_binaryKey) {
for (int64_t i = 1; i <= 99; ++i) { for (int64_t i = 1; i <= 99; ++i) {
combineKey.suid = i; combineKey.suid = i;
combineKey.uid = i + 1; combineKey.uid = i + 1;
tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen); code = tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen);
ASSERT(code == 0);
ASSERT_EQ(99 - i, tSimpleHashGetSize(pHashObj)); ASSERT_EQ(99 - i, tSimpleHashGetSize(pHashObj));
} }

View File

@ -69,7 +69,8 @@ void *backendOpen() {
key.ts = ts; key.ts = ts;
const char *val = "value data"; const char *val = "value data";
int32_t vlen = strlen(val); int32_t vlen = strlen(val);
streamStatePut_rocksdb(p, &key, (char *)val, vlen); int32_t code = streamStatePut_rocksdb(p, &key, (char *)val, vlen);
ASSERT(code == 0);
tsArray.push_back(ts); tsArray.push_back(ts);
} }
@ -82,7 +83,9 @@ void *backendOpen() {
const char *val = "value data"; const char *val = "value data";
int32_t len = 0; int32_t len = 0;
char *newVal = NULL; char *newVal = NULL;
streamStateGet_rocksdb(p, &key, (void **)&newVal, &len); int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
ASSERT(code == 0);
ASSERT(len == strlen(val)); ASSERT(len == strlen(val));
} }
int64_t ts = tsArray[0]; int64_t ts = tsArray[0];
@ -90,9 +93,11 @@ void *backendOpen() {
key.groupId = (uint64_t)(0); key.groupId = (uint64_t)(0);
key.ts = ts; key.ts = ts;
streamStateDel_rocksdb(p, &key); int32_t code = streamStateDel_rocksdb(p, &key);
ASSERT(code == 0);
streamStateClear_rocksdb(p); code = streamStateClear_rocksdb(p);
ASSERT(code == 0);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
int64_t ts = tsArray[i]; int64_t ts = tsArray[i];
@ -118,11 +123,12 @@ void *backendOpen() {
const char *val = "value data"; const char *val = "value data";
int32_t vlen = strlen(val); int32_t vlen = strlen(val);
streamStatePut_rocksdb(p, &key, (char *)val, vlen); code = streamStatePut_rocksdb(p, &key, (char *)val, vlen);
ASSERT(code == 0);
} }
SWinKey winkey; SWinKey winkey;
int32_t code = streamStateGetFirst_rocksdb(p, &key); code = streamStateGetFirst_rocksdb(p, &key);
ASSERT(code == 0); ASSERT(code == 0);
ASSERT(key.ts == tsArray[0]); ASSERT(key.ts == tsArray[0]);
@ -151,7 +157,8 @@ void *backendOpen() {
const char *val = "Value"; const char *val = "Value";
int32_t len = strlen(val); int32_t len = strlen(val);
streamStateFuncPut_rocksdb(p, &key, val, len); code = streamStateFuncPut_rocksdb(p, &key, val, len);
ASSERT(code == 0);
} }
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
STupleKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i}; STupleKey key = {0}; //{.groupId = (uint64_t)(0), .ts = tsArray[i], .exprIdx = i};
@ -161,7 +168,9 @@ void *backendOpen() {
char *val = NULL; char *val = NULL;
int32_t len = 0; int32_t len = 0;
streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len); int32_t code = streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len);
ASSERT(code == 0);
ASSERT(len == strlen("Value")); ASSERT(len == strlen("Value"));
} }
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
@ -172,7 +181,8 @@ void *backendOpen() {
char *val = NULL; char *val = NULL;
int32_t len = 0; int32_t len = 0;
streamStateFuncDel_rocksdb(p, &key); int32_t code = streamStateFuncDel_rocksdb(p, &key);
ASSERT(code == 0);
} }
// session put // session put
@ -187,7 +197,8 @@ void *backendOpen() {
const char *val = "Value"; const char *val = "Value";
int32_t len = strlen(val); int32_t len = strlen(val);
streamStateSessionPut_rocksdb(p, &key, val, len); code = streamStateSessionPut_rocksdb(p, &key, val, len);
ASSERT(code == 0);
char *pval = NULL; char *pval = NULL;
ASSERT(0 == streamStateSessionGet_rocksdb(p, &key, (void **)&pval, &len)); ASSERT(0 == streamStateSessionGet_rocksdb(p, &key, (void **)&pval, &len));
@ -346,7 +357,9 @@ void *backendOpen() {
ASSERT(code == 0); ASSERT(code == 0);
} }
SArray *result = taosArrayInit(8, sizeof(void *)); SArray *result = taosArrayInit(8, sizeof(void *));
streamDefaultIterGet_rocksdb(p, "tbname", "tbname_99", result); code = streamDefaultIterGet_rocksdb(p, "tbname", "tbname_99", result);
ASSERT(code == 0);
ASSERT(taosArrayGetSize(result) >= 0); ASSERT(taosArrayGetSize(result) >= 0);
return p; return p;
@ -363,10 +376,14 @@ TEST_F(BackendEnv, checkOpen) {
sprintf(key, "key_%d", i); sprintf(key, "key_%d", i);
char val[128] = {0}; char val[128] = {0};
sprintf(val, "val_%d", i); sprintf(val, "val_%d", i);
streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, int32_t code = streamStatePutBatch(p, "default", (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000); (int32_t)(strlen(val)), tsStart + 100000);
ASSERT(code == 0);
} }
streamStatePutBatch_rocksdb(p, pBatch);
int32_t code = streamStatePutBatch_rocksdb(p, pBatch);
ASSERT(code == 0);
streamStateDestroyBatch(pBatch); streamStateDestroyBatch(pBatch);
} }
{ {
@ -378,14 +395,18 @@ TEST_F(BackendEnv, checkOpen) {
sprintf(key, "key_%d", i); sprintf(key, "key_%d", i);
char val[128] = {0}; char val[128] = {0};
sprintf(val, "val_%d", i); sprintf(val, "val_%d", i);
streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
ASSERT(code == 0);
} }
streamStatePutBatch_rocksdb(p, pBatch); int32_t code = streamStatePutBatch_rocksdb(p, pBatch);
ASSERT(code == 0);
streamStateDestroyBatch(pBatch); streamStateDestroyBatch(pBatch);
} }
// do checkpoint 2 // do checkpoint 2
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2, 0); int32_t code = taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2, 0);
ASSERT(code == 0);
{ {
void *pBatch = streamStateCreateBatch(); void *pBatch = streamStateCreateBatch();
int32_t size = 0; int32_t size = 0;
@ -395,27 +416,37 @@ TEST_F(BackendEnv, checkOpen) {
sprintf(key, "key_%d", i); sprintf(key, "key_%d", i);
char val[128] = {0}; char val[128] = {0};
sprintf(val, "val_%d", i); sprintf(val, "val_%d", i);
streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val, int32_t code = streamStatePutBatchOptimize(p, 0, (rocksdb_writebatch_t *)pBatch, (void *)key, (void *)val,
(int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf); (int32_t)(strlen(val)), tsStart + 100000, (void *)valBuf);
ASSERT(code == 0);
} }
streamStatePutBatch_rocksdb(p, pBatch); code = streamStatePutBatch_rocksdb(p, pBatch);
ASSERT(code == 0);
streamStateDestroyBatch(pBatch); streamStateDestroyBatch(pBatch);
} }
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3, 0); code = taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 3, 0);
ASSERT(code == 0);
const char *path = "/tmp/backend/stream"; const char *path = "/tmp/backend/stream";
const char *dump = "/tmp/backend/stream/dump"; const char *dump = "/tmp/backend/stream/dump";
// taosMkDir(dump); // taosMkDir(dump);
taosMulMkDir(dump); code = taosMulMkDir(dump);
ASSERT(code == 0);
SBkdMgt *mgt = bkdMgtCreate((char *)path); SBkdMgt *mgt = bkdMgtCreate((char *)path);
SArray *result = taosArrayInit(4, sizeof(void *)); SArray *result = taosArrayInit(4, sizeof(void *));
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump); code = bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
ASSERT(code == 0);
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0); code = taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4, 0);
ASSERT(code == 0);
taosArrayClear(result); taosArrayClear(result);
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 4, result, (char *)dump); code = bkdMgtGetDelta(mgt, p->pTdbState->idstr, 4, result, (char *)dump);
ASSERT(code == 0);
bkdMgtDestroy(mgt); bkdMgtDestroy(mgt);
streamStateClose((SStreamState *)p, true); streamStateClose((SStreamState *)p, true);
// { // {
@ -444,7 +475,9 @@ TEST_F(BackendEnv, backendUtil) {
} }
TEST_F(BackendEnv, oldBackendInit) { TEST_F(BackendEnv, oldBackendInit) {
const char *path = "/tmp/backend1"; const char *path = "/tmp/backend1";
taosMulMkDir(path); int32_t code = taosMulMkDir(path);
ASSERT(code == 0);
{ {
SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(path, 10, 10); SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(path, 10, 10);
streamBackendCleanup((void *)p); streamBackendCleanup((void *)p);