From d4c550db66f8b0a624937307395bc0ca5932bd4f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 10 May 2022 21:35:43 +0800 Subject: [PATCH 01/10] enh(query): add executor plan interface --- source/libs/executor/test/executorTests.cpp | 120 +++++----- source/libs/index/src/indexCache.c | 44 ++-- source/libs/index/src/indexTfile.c | 57 +++-- source/libs/index/test/fstTest.cc | 15 +- source/libs/index/test/jsonUT.cc | 237 ++++++++++++++++++++ 5 files changed, 368 insertions(+), 105 deletions(-) diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index f534cf0917..5ed960af82 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -23,34 +23,34 @@ #pragma GCC diagnostic ignored "-Wsign-compare" #include "os.h" -#include "tglobal.h" +#include "executor.h" #include "executorimpl.h" #include "function.h" -#include "taos.h" -#include "tdef.h" -#include "tvariant.h" -#include "tdatablock.h" -#include "trpc.h" #include "stub.h" -#include "executor.h" +#include "taos.h" +#include "tdatablock.h" +#include "tdef.h" +#include "tglobal.h" #include "tmsg.h" #include "tname.h" +#include "trpc.h" +#include "tvariant.h" namespace { enum { data_rand = 0x1, - data_asc = 0x2, + data_asc = 0x2, data_desc = 0x3, }; typedef struct SDummyInputInfo { - int32_t totalPages; // numOfPages + int32_t totalPages; // numOfPages int32_t current; int32_t startVal; int32_t type; int32_t numOfRowsPerPage; - int32_t numOfCols; // number of columns + int32_t numOfCols; // number of columns int64_t tsStart; SSDataBlock* pBlock; } SDummyInputInfo; @@ -75,26 +75,26 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) { taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo); -// SColumnInfoData colInfo1 = {0}; -// colInfo1.info.type = TSDB_DATA_TYPE_BINARY; -// colInfo1.info.bytes = 40; -// colInfo1.info.colId = 2; -// -// colInfo1.varmeta.allocLen = 0;//numOfRows * sizeof(int32_t); -// colInfo1.varmeta.length = 0; -// colInfo1.varmeta.offset = static_cast(taosMemoryCalloc(1, numOfRows * sizeof(int32_t))); -// -// taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1); + // SColumnInfoData colInfo1 = {0}; + // colInfo1.info.type = TSDB_DATA_TYPE_BINARY; + // colInfo1.info.bytes = 40; + // colInfo1.info.colId = 2; + // + // colInfo1.varmeta.allocLen = 0;//numOfRows * sizeof(int32_t); + // colInfo1.varmeta.length = 0; + // colInfo1.varmeta.offset = static_cast(taosMemoryCalloc(1, numOfRows * sizeof(int32_t))); + // + // taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1); } else { blockDataCleanup(pInfo->pBlock); } SSDataBlock* pBlock = pInfo->pBlock; - char buf[128] = {0}; - char b1[128] = {0}; + char buf[128] = {0}; + char b1[128] = {0}; int32_t v = 0; - for(int32_t i = 0; i < pInfo->numOfRowsPerPage; ++i) { + for (int32_t i = 0; i < pInfo->numOfRowsPerPage; ++i) { SColumnInfoData* pColInfo = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); if (pInfo->type == data_desc) { @@ -107,11 +107,11 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) { colDataAppend(pColInfo, i, reinterpret_cast(&v), false); -// sprintf(buf, "this is %d row", i); -// STR_TO_VARSTR(b1, buf); -// -// SColumnInfoData* pColInfo2 = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 1)); -// colDataAppend(pColInfo2, i, b1, false); + // sprintf(buf, "this is %d row", i); + // STR_TO_VARSTR(b1, buf); + // + // SColumnInfoData* pColInfo2 = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 1)); + // colDataAppend(pColInfo2, i, b1, false); } pBlock->info.rows = pInfo->numOfRowsPerPage; @@ -137,7 +137,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { colInfo.info.bytes = sizeof(int64_t); colInfo.info.colId = 1; colInfo.pData = static_cast(taosMemoryCalloc(pInfo->numOfRowsPerPage, sizeof(int64_t))); -// colInfo.nullbitmap = static_cast(taosMemoryCalloc(1, (pInfo->numOfRowsPerPage + 7) / 8)); + // colInfo.nullbitmap = static_cast(taosMemoryCalloc(1, (pInfo->numOfRowsPerPage + 7) / 8)); taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo); @@ -156,11 +156,11 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { SSDataBlock* pBlock = pInfo->pBlock; - char buf[128] = {0}; - char b1[128] = {0}; + char buf[128] = {0}; + char b1[128] = {0}; int64_t ts = 0; - int32_t v = 0; - for(int32_t i = 0; i < pInfo->numOfRowsPerPage; ++i) { + int32_t v = 0; + for (int32_t i = 0; i < pInfo->numOfRowsPerPage; ++i) { SColumnInfoData* pColInfo = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); ts = (++pInfo->tsStart); @@ -177,11 +177,11 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { colDataAppend(pColInfo1, i, reinterpret_cast(&v), false); -// sprintf(buf, "this is %d row", i); -// STR_TO_VARSTR(b1, buf); -// -// SColumnInfoData* pColInfo2 = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 1)); -// colDataAppend(pColInfo2, i, b1, false); + // sprintf(buf, "this is %d row", i); + // STR_TO_VARSTR(b1, buf); + // + // SColumnInfoData* pColInfo2 = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 1)); + // colDataAppend(pColInfo2, i, b1, false); } pBlock->info.rows = pInfo->numOfRowsPerPage; @@ -191,10 +191,10 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { blockDataUpdateTsWindow(pBlock); return pBlock; - } -SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type, int32_t numOfCols) { +SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type, + int32_t numOfCols) { SOperatorInfo* pOperator = static_cast(taosMemoryCalloc(1, sizeof(SOperatorInfo))); pOperator->name = "dummyInputOpertor4Test"; @@ -204,24 +204,25 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_ pOperator->fpSet.getNextFn = get2ColsDummyBlock; } - SDummyInputInfo *pInfo = (SDummyInputInfo*) taosMemoryCalloc(1, sizeof(SDummyInputInfo)); + SDummyInputInfo* pInfo = (SDummyInputInfo*)taosMemoryCalloc(1, sizeof(SDummyInputInfo)); pInfo->totalPages = numOfBlocks; - pInfo->startVal = startVal; + pInfo->startVal = startVal; pInfo->numOfRowsPerPage = rowsPerPage; - pInfo->type = type; - pInfo->tsStart = 1620000000000; + pInfo->type = type; + pInfo->tsStart = 1620000000000; pOperator->info = pInfo; return pOperator; } -} +} // namespace int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } TEST(testCase, build_executor_tree_Test) { - const char* msg = "{\n" + const char* msg = + "{\n" " \"NodeType\": \"48\",\n" " \"Name\": \"PhysiSubplan\",\n" " \"PhysiSubplan\": {\n" @@ -938,16 +939,19 @@ TEST(testCase, build_executor_tree_Test) { SExecTaskInfo* pTaskInfo = nullptr; DataSinkHandle sinkHandle = nullptr; - SReadHandle handle = { reinterpret_cast(0x1), reinterpret_cast(0x1), NULL }; + SReadHandle handle = {reinterpret_cast(0x1), reinterpret_cast(0x1), NULL}; - struct SSubplan *plan = NULL; - int32_t code = qStringToSubplan(msg, &plan); + struct SSubplan* plan = NULL; + int32_t code = qStringToSubplan(msg, &plan); ASSERT_EQ(code, 0); - code = qCreateExecTask(&handle, 2, 1, plan, (void**) &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH); + code = qCreateExecTask(&handle, 2, 1, plan, (void**)&pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH); ASSERT_EQ(code, 0); } - +TEST(testCase, index_plan_test) { + // add later + EXPECT_EQ(0, 0); +} #if 0 TEST(testCase, inMem_sort_Test) { @@ -983,19 +987,19 @@ TEST(testCase, inMem_sort_Test) { typedef struct su { int32_t v; - char *c; + char* c; } su; int32_t cmp(const void* p1, const void* p2) { - su* v1 = (su*) p1; - su* v2 = (su*) p2; + su* v1 = (su*)p1; + su* v2 = (su*)p2; - int32_t x1 = *(int32_t*) v1->c; - int32_t x2 = *(int32_t*) v2->c; + int32_t x1 = *(int32_t*)v1->c; + int32_t x2 = *(int32_t*)v2->c; if (x1 == x2) { return 0; } else { - return x1 < x2? -1:1; + return x1 < x2 ? -1 : 1; } } @@ -1228,4 +1232,4 @@ TEST(testCase, time_interval_Operator_Test) { } #endif -#pragma GCC diagnosti \ No newline at end of file +#pragma GCC diagnosti diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 0653c1d1fa..5294ac8c19 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -278,9 +278,9 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe break; } CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); - printf("json val: %s\n", c->colVal); - if (0 != strncmp(c->colVal, term->colName, term->nColName)) { - continue; + // printf("json val: %s\n", c->colVal); + if (0 != strncmp(c->colVal, pCt->colVal, skip)) { + break; } TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType); @@ -640,30 +640,30 @@ static int indexFindCh(char* a, char c) { return p - a; } static int indexCacheJsonTermCompareImpl(char* a, char* b) { - int alen = indexFindCh(a, '&'); - int blen = indexFindCh(b, '&'); + // int alen = indexFindCh(a, '&'); + // int blen = indexFindCh(b, '&'); - int cmp = strncmp(a, b, MIN(alen, blen)); - if (cmp == 0) { - cmp = alen - blen; - if (cmp != 0) { - return cmp; - } - cmp = *(a + alen) - *(b + blen); - if (cmp != 0) { - return cmp; - } - alen += 2; - blen += 2; - cmp = strcmp(a + alen, b + blen); - } - return cmp; + // int cmp = strncmp(a, b, MIN(alen, blen)); + // if (cmp == 0) { + // cmp = alen - blen; + // if (cmp != 0) { + // return cmp; + // } + // cmp = *(a + alen) - *(b + blen); + // if (cmp != 0) { + // return cmp; + // } + // alen += 2; + // blen += 2; + // cmp = strcmp(a + alen, b + blen); + //} + return 0; } static int32_t indexCacheJsonTermCompare(const void* l, const void* r) { CacheTerm* lt = (CacheTerm*)l; CacheTerm* rt = (CacheTerm*)r; // compare colVal - int cmp = indexCacheJsonTermCompareImpl(lt->colVal, rt->colVal); + int32_t cmp = strcmp(lt->colVal, rt->colVal); if (cmp == 0) { return rt->version - lt->version; } @@ -704,6 +704,8 @@ static bool indexCacheIteratorNext(Iterate* itera) { iv->type = ct->operaType; iv->ver = ct->version; iv->colVal = tstrdup(ct->colVal); + // printf("col Val: %s\n", iv->colVal); + // iv->colType = cv->colType; taosArrayPush(iv->val, &ct->uid); } diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index c56d65fc6a..4cc2a4975f 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -334,7 +334,12 @@ static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { FstSlice* s = &rt->data; char* ch = (char*)fstSliceData(s, NULL); - TExeCond cond = cmpFn(ch, p, tem->colType); + // if (0 != strncmp(ch, tem->colName, tem->nColName)) { + // swsResultDestroy(rt); + // break; + //} + + TExeCond cond = cmpFn(ch, p, tem->colType); if (MATCH == cond) { tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total); } else if (CONTINUE == cond) { @@ -455,16 +460,22 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX); FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx); - FstSlice h = fstSliceCreate((uint8_t*)p, skip); - fstStreamBuilderSetRange(sb, &h, ctype); - fstSliceDestroy(&h); + // FstSlice h = fstSliceCreate((uint8_t*)p, skip); + // fstStreamBuilderSetRange(sb, &h, ctype); + // fstSliceDestroy(&h); StreamWithState* st = streamBuilderIntoStream(sb); StreamWithStateResult* rt = NULL; while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { FstSlice* s = &rt->data; - char* ch = (char*)fstSliceData(s, NULL); - TExeCond cond = cmpFn(ch, p, tem->colType); + + char* ch = (char*)fstSliceData(s, NULL); + if (0 != strncmp(ch, p, skip)) { + swsResultDestroy(rt); + break; + } + + TExeCond cond = cmpFn(ch + skip, tem->colVal, tem->colType); if (MATCH == cond) { tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total); } else if (CONTINUE == cond) { @@ -594,13 +605,16 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { if (tfileWriteData(tw, v) != 0) { indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId)); + // printf("write faile\n"); } else { + // printf("write sucee\n"); // indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset, // (int)taosArrayGetSize(v->tableId)); // indexInfo("tfile write data size: %d", tw->ctx->size(tw->ctx)); } } + fstBuilderFinish(tw->fb); fstBuilderDestroy(tw->fb); tw->fb = NULL; @@ -845,18 +859,24 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) { uint8_t colType = header->colType; colType = INDEX_TYPE_GET_TYPE(colType); - if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) { - FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal)); - if (fstBuilderInsert(write->fb, key, tval->offset)) { - fstSliceDestroy(&key); - return 0; - } + FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal)); + if (fstBuilderInsert(write->fb, key, tval->offset)) { fstSliceDestroy(&key); - return -1; - } else { - // handle other type later + return 0; } - return 0; + return -1; + + // if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) { + // FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal)); + // if (fstBuilderInsert(write->fb, key, tval->offset)) { + // fstSliceDestroy(&key); + // return 0; + // } + // fstSliceDestroy(&key); + // return -1; + //} else { + // // handle other type later + //} } static int tfileWriteFooter(TFileWriter* write) { char buf[sizeof(tfileMagicNumber) + 1] = {0}; @@ -913,8 +933,9 @@ static int tfileReaderLoadFst(TFileReader* reader) { static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) { // TODO(yihao): opt later WriterCtx* ctx = reader->ctx; - char block[1024] = {0}; - int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset); + // add block cache + char block[1024] = {0}; + int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset); assert(nread >= sizeof(uint32_t)); char* p = block; diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index bfe1bb21bf..0af82c9175 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -272,9 +272,8 @@ void checkFstCheckIterator1() { std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl; - fw->Put("Hello world", 1); - fw->Put("Hello worle", 2); - fw->Put("hello worlf", 4); + fw->Put("test1&^D&10", 1); + fw->Put("test2&^D&10", 2); delete fw; FstReadMemory* m = new FstReadMemory(1024 * 64); @@ -645,11 +644,11 @@ int main(int argc, char* argv[]) { // iterTFileReader(argv[1], argv[2], argv[3], argv[4]); //} checkFstCheckIterator1(); - checkFstCheckIterator2(); - checkFstCheckIteratorPrefix(); - checkFstCheckIteratorRange1(); - checkFstCheckIteratorRange2(); - checkFstCheckIteratorRange3(); + // checkFstCheckIterator2(); + // checkFstCheckIteratorPrefix(); + // checkFstCheckIteratorRange1(); + // checkFstCheckIteratorRange2(); + // checkFstCheckIteratorRange3(); // checkFstLongTerm(); // checkFstPrefixSearch(); diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index e5692b98f9..5f471dba65 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -181,3 +181,240 @@ TEST_F(JsonEnv, testWriteMillonData) { } } } +TEST_F(JsonEnv, testWriteJsonNumberData) { + { + std::string colName("test"); + std::string colVal("10"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 1000; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + { + std::string colName("test2"); + std::string colVal("20"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 1000; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + { + std::string colName("test2"); + std::string colVal("15"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 1000; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + { + std::string colName("test2"); + std::string colVal("15"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 1000; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + { + std::string colName("test"); + std::string colVal("10"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), + colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_TERM); + tIndexJsonSearch(index, mq, result); + EXPECT_EQ(1000, taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } + { + std::string colName("test"); + std::string colVal("10"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), + colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN); + tIndexJsonSearch(index, mq, result); + EXPECT_EQ(0, taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } + { + std::string colName("test"); + std::string colVal("10"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), + colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL); + tIndexJsonSearch(index, mq, result); + EXPECT_EQ(1000, taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } + { + std::string colName("test"); + std::string colVal("10"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), + colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN); + tIndexJsonSearch(index, mq, result); + EXPECT_EQ(0, taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } + { + std::string colName("test"); + std::string colVal("10"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), + colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL); + tIndexJsonSearch(index, mq, result); + EXPECT_EQ(1000, taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } +} + +TEST_F(JsonEnv, testWriteJsonTfileAndCache) { + { + std::string colName("test1"); + std::string colVal("10"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 1000; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + { + std::string colName("test"); + std::string colVal("xxxxxxxxxxxxxxxxxxx"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 100000; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + { + std::string colName("test1"); + std::string colVal("10"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), + colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_TERM); + tIndexJsonSearch(index, mq, result); + EXPECT_EQ(1000, taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } + { + std::string colName("test1"); + std::string colVal("10"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), + colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN); + tIndexJsonSearch(index, mq, result); + EXPECT_EQ(0, taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } + { + std::string colName("test1"); + std::string colVal("10"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), + colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL); + tIndexJsonSearch(index, mq, result); + EXPECT_EQ(1000, taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } + { + std::string colName("test1"); + std::string colVal("10"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), + colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN); + tIndexJsonSearch(index, mq, result); + EXPECT_EQ(0, taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } + { + std::string colName("test1"); + std::string colVal("10"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), + colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL); + tIndexJsonSearch(index, mq, result); + EXPECT_EQ(1000, taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } + { + std::string colName("test1"); + std::string colVal("10"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), + colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN); + tIndexJsonSearch(index, mq, result); + EXPECT_EQ(0, taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } +} From 2403f5294c389584048ae54f768091d5f9abe30d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 10 May 2022 21:37:18 +0800 Subject: [PATCH 02/10] enh(query): add executor plan interface --- source/libs/transport/inc/transComm.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 21af35e8f7..e4e79ccf47 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -105,8 +105,8 @@ typedef void* queue[2]; /* Return the structure holding the given element. */ #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) -#define TRANS_RETRY_COUNT_LIMIT 10 // retry count limit -#define TRANS_RETRY_INTERVAL 5 // ms retry interval +#define TRANS_RETRY_COUNT_LIMIT 20 // retry count limit +#define TRANS_RETRY_INTERVAL 15 // ms retry interval #define TRANS_CONN_TIMEOUT 3 // connect timeout typedef struct { From 8c89c41d0a6dc15365bcf2ac796bc2e1585038ef Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 10 May 2022 23:20:35 +0800 Subject: [PATCH 03/10] fix(sync): display OFFLINE when show vgroups --- source/dnode/mnode/impl/src/mndVgroup.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 48a475ec63..4cc65579d5 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -682,7 +682,11 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock * colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].dnodeId, false); char buf1[20] = {0}; - const char *role = syncStr(pVgroup->vnodeGid[i].role); + SDnodeObj *pDnodeObj = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId); + ASSERT(pDnodeObj != NULL); + bool isOffLine = !mndIsDnodeOnline(pMnode, pDnodeObj, taosGetTimestampMs()); + const char *role = isOffLine ? "OFFLINE" : syncStr(pVgroup->vnodeGid[i].role); + STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); From ac460915d6e91d611b62088645f0ccf4660f99cc Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 10 May 2022 23:47:44 +0800 Subject: [PATCH 04/10] fix(tmq): show --- source/client/src/tmq.c | 11 +++++++++-- source/common/src/systable.c | 4 ++-- source/dnode/mnode/impl/src/mndConsumer.c | 17 ++++++++++++----- source/dnode/mnode/impl/src/mndSubscribe.c | 21 ++++++++++++++------- 4 files changed, 37 insertions(+), 16 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index c768e001c5..0c73e000a8 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1307,8 +1307,15 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { } tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { - // TODO - return TMQ_RESP_ERR__SUCCESS; + tmq_list_t* lst = tmq_list_new(); + tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); + tmq_list_destroy(lst); + if (rsp == TMQ_RESP_ERR__SUCCESS) { + // TODO: free resources + return TMQ_RESP_ERR__SUCCESS; + } else { + return TMQ_RESP_ERR__FAIL; + } } const char* tmq_err2str(tmq_resp_err_t err) { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 5ff0282c87..81682bb734 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -262,7 +262,7 @@ static const SSysDbTableSchema topicSchema[] = { static const SSysDbTableSchema consumerSchema[] = { {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, - {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, @@ -275,7 +275,7 @@ static const SSysDbTableSchema consumerSchema[] = { static const SSysDbTableSchema subscriptionSchema[] = { {.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "group_id", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, }; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 6c77c379e0..155ea6ae93 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -684,6 +684,9 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY || pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) { pOldConsumer->status = MQ_CONSUMER_STATUS__READY; + // TODO: remove + /*if (taosArrayGetSize(pOldConsumer->assignedTopics) == 0) {*/ + /*}*/ } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB || pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) { pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; @@ -789,6 +792,10 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer); if (pShow->pIter == NULL) break; + if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { + sdbRelease(pSdb, pConsumer); + continue; + } taosRLockLatch(&pConsumer->lock); @@ -810,12 +817,12 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false); - // group id - char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN); - varDataSetLen(groupId, strlen(varDataVal(groupId))); + // consumer group + char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(varDataVal(cgroup), pConsumer->cgroup, TSDB_CGROUP_LEN); + varDataSetLen(cgroup, strlen(varDataVal(cgroup))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)groupId, false); + colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); // app id char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c947a1913e..2a81f28edd 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -171,14 +171,21 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM return 0; } -static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) { +static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) { int32_t i = 0; while (key[i] != TMQ_SEPARATOR) { i++; } memcpy(cgroup, key, i); cgroup[i] = 0; - strcpy(topic, &key[i + 1]); + if (fullName) { + strcpy(topic, &key[i + 1]); + } else { + while (key[i] != '.') { + i++; + } + strcpy(topic, &key[i + 1]); + } return 0; } @@ -426,7 +433,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO pConsumerNew->updateType = CONSUMER_UPDATE__ADD; char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup); + mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); taosArrayPush(pConsumerNew->rebNewTopics, &topic); mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { @@ -444,7 +451,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE; char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup); + mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); taosArrayPush(pConsumerNew->rebRemovedTopics, &topic); mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { @@ -494,7 +501,7 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { // split sub key and extract topic char topic[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pRebInfo->key, topic, cgroup); + mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); ASSERT(pTopic); taosRLockLatch(&pTopic->lock); @@ -747,7 +754,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock // topic and cgroup char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - mndSplitSubscribeKey(pSub->key, topic, cgroup); + mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false); varDataSetLen(topic, strlen(varDataVal(topic))); varDataSetLen(cgroup, strlen(varDataVal(cgroup))); @@ -790,7 +797,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock // topic and cgroup char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - mndSplitSubscribeKey(pSub->key, topic, cgroup); + mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false); varDataSetLen(topic, strlen(varDataVal(topic))); varDataSetLen(cgroup, strlen(varDataVal(cgroup))); From 32e6557353733dad422a0a016edb392dfcc6c66f Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Wed, 11 May 2022 01:53:30 +0800 Subject: [PATCH 05/10] fix(os): add print trace func. --- cmake/addr2line_CMakeLists.txt.in | 12 ++++ cmake/cmake.options | 6 ++ cmake/libdwarf_CMakeLists.txt.in | 12 ++++ contrib/CMakeLists.txt | 49 +++++++++++++- include/os/osMemory.h | 1 + source/os/CMakeLists.txt | 16 ++++- source/os/src/osMemory.c | 102 +++++++++++++++++++++++++++++- 7 files changed, 191 insertions(+), 7 deletions(-) create mode 100644 cmake/addr2line_CMakeLists.txt.in create mode 100644 cmake/libdwarf_CMakeLists.txt.in diff --git a/cmake/addr2line_CMakeLists.txt.in b/cmake/addr2line_CMakeLists.txt.in new file mode 100644 index 0000000000..e514764af8 --- /dev/null +++ b/cmake/addr2line_CMakeLists.txt.in @@ -0,0 +1,12 @@ + +# addr2line +ExternalProject_Add(addr2line + GIT_REPOSITORY https://github.com/davea42/libdwarf-addr2line.git + GIT_TAG master + SOURCE_DIR "${TD_CONTRIB_DIR}/addr2line" + BINARY_DIR "${TD_CONTRIB_DIR}/addr2line" + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + TEST_COMMAND "" + ) diff --git a/cmake/cmake.options b/cmake/cmake.options index f32c5acdd1..a60f5c7282 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -48,6 +48,12 @@ IF(${TD_WINDOWS}) ENDIF () +option( + BUILD_ADDR2LINE + "If build addr2line" + OFF + ) + option( BUILD_TEST "If build unit tests using googletest" diff --git a/cmake/libdwarf_CMakeLists.txt.in b/cmake/libdwarf_CMakeLists.txt.in new file mode 100644 index 0000000000..7de34cfbaa --- /dev/null +++ b/cmake/libdwarf_CMakeLists.txt.in @@ -0,0 +1,12 @@ + +# libdwarf +ExternalProject_Add(libdwarf + GIT_REPOSITORY https://github.com/davea42/libdwarf-code.git + GIT_TAG libdwarf-0.3.1 + SOURCE_DIR "${TD_CONTRIB_DIR}/libdwarf" + BINARY_DIR "${TD_CONTRIB_DIR}/libdwarf" + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + TEST_COMMAND "" + ) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index df69eb8aa1..926fbc8957 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -98,6 +98,12 @@ if(${BUILD_WITH_NURAFT}) cat("${TD_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif(${BUILD_WITH_NURAFT}) +# addr2line +if(${BUILD_ADDR2LINE}) + cat("${TD_SUPPORT_DIR}/libdwarf_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) + cat("${TD_SUPPORT_DIR}/addr2line_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) +endif(${BUILD_ADDR2LINE}) + # download dependencies configure_file(${CONTRIB_TMP_FILE} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt") execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . @@ -327,7 +333,48 @@ if(${BUILD_WITH_SQLITE}) endif(NOT TD_WINDOWS) endif(${BUILD_WITH_SQLITE}) -# pthread +# addr2line +if(${BUILD_ADDR2LINE}) + check_include_file( "sys/types.h" HAVE_SYS_TYPES_H) + check_include_file( "sys/stat.h" HAVE_SYS_STAT_H ) + check_include_file( "inttypes.h" HAVE_INTTYPES_H ) + check_include_file( "stddef.h" HAVE_STDDEF_H ) + check_include_file( "stdlib.h" HAVE_STDLIB_H ) + check_include_file( "string.h" HAVE_STRING_H ) + check_include_file( "memory.h" HAVE_MEMORY_H ) + check_include_file( "strings.h" HAVE_STRINGS_H ) + check_include_file( "stdint.h" HAVE_STDINT_H ) + check_include_file( "unistd.h" HAVE_UNISTD_H ) + check_include_file( "sgidefs.h" HAVE_SGIDEFS_H ) + check_include_file( "stdafx.h" HAVE_STDAFX_H ) + check_include_file( "elf.h" HAVE_ELF_H ) + check_include_file( "libelf.h" HAVE_LIBELF_H ) + check_include_file( "libelf/libelf.h" HAVE_LIBELF_LIBELF_H) + check_include_file( "alloca.h" HAVE_ALLOCA_H ) + check_include_file( "elfaccess.h" HAVE_ELFACCESS_H) + check_include_file( "sys/elf_386.h" HAVE_SYS_ELF_386_H ) + check_include_file( "sys/elf_amd64.h" HAVE_SYS_ELF_AMD64_H) + check_include_file( "sys/elf_sparc.h" HAVE_SYS_ELF_SPARC_H) + check_include_file( "sys/ia64/elf.h" HAVE_SYS_IA64_ELF_H ) + set(VERSION 0.3.1) + set(PACKAGE_VERSION "\"${VERSION}\"") + configure_file(libdwarf/cmake/config.h.cmake config.h) + file(GLOB_RECURSE LIBDWARF_SOURCES "libdwarf/src/lib/libdwarf/*.c") + add_library(libdwarf STATIC ${LIBDWARF_SOURCES}) + set_target_properties(libdwarf PROPERTIES OUTPUT_NAME "libdwarf") + if(HAVE_LIBELF_H OR HAVE_LIBELF_LIBELF_H) + target_link_libraries(libdwarf PUBLIC libelf) + endif() + target_include_directories(libdwarf SYSTEM PUBLIC "libdwarf/src/lib/libdwarf" ${CMAKE_BINARY_DIR}/contrib) + file(READ "addr2line/addr2line.c" ADDR2LINE_CONTENT) + string(REPLACE "static int" "int" ADDR2LINE_CONTENT "${ADDR2LINE_CONTENT}") + string(REPLACE "static void" "void" ADDR2LINE_CONTENT "${ADDR2LINE_CONTENT}") + string(REPLACE "main(" "main_addr2line(" ADDR2LINE_CONTENT "${ADDR2LINE_CONTENT}") + file(WRITE "addr2line/addr2line.c" "${ADDR2LINE_CONTENT}") + add_library(addr2line STATIC "addr2line/addr2line.c") + target_link_libraries(addr2line PUBLIC libdwarf dl z) + target_include_directories(addr2line PUBLIC "libdwarf/src/lib/libdwarf" ) +endif(${BUILD_ADDR2LINE}) # ================================================================================================ diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 92d9319d5c..ba69a32941 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -35,6 +35,7 @@ void *taosMemoryRealloc(void *ptr, int32_t size); void *taosMemoryStrDup(void *ptr); void taosMemoryFree(void *ptr); int32_t taosMemorySize(void *ptr); +void taosPrintBackTrace(); #define taosMemoryFreeClear(ptr) \ do { \ diff --git a/source/os/CMakeLists.txt b/source/os/CMakeLists.txt index d709bbbcc2..ad6cfc8b95 100644 --- a/source/os/CMakeLists.txt +++ b/source/os/CMakeLists.txt @@ -17,15 +17,25 @@ endif () if(USE_TD_MEMORY) add_definitions(-DUSE_TD_MEMORY) endif () +if(BUILD_ADDR2LINE) + target_include_directories( + os + PUBLIC "${TD_SOURCE_DIR}/contrib/libdwarf/src/lib/libdwarf" + ) + add_definitions(-DUSE_ADDR2LINE) + target_link_libraries( + os PUBLIC addr2line dl z + ) +endif () target_link_libraries( - os pthread + os PUBLIC pthread ) if(NOT TD_WINDOWS) target_link_libraries( - os dl m rt + os PUBLIC dl m rt ) else() target_link_libraries( - os ws2_32 iconv msvcregex wcwidth winmm + os PUBLIC ws2_32 iconv msvcregex wcwidth winmm ) endif(NOT TD_WINDOWS) diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 73c37c28f7..b1b91699a6 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -17,7 +17,7 @@ #include #include "os.h" -#ifdef USE_TD_MEMORY +#if defined(USE_TD_MEMORY) || defined(USE_ADDR2LINE) #define TD_MEMORY_SYMBOL ('T' << 24 | 'A' << 16 | 'O' << 8 | 'S') @@ -71,14 +71,110 @@ int32_t taosBackTrace(void **buffer, int32_t size) { return frame; } -#endif - // char **taosBackTraceSymbols(int32_t *size) { // void *buffer[20] = {NULL}; // *size = taosBackTrace(buffer, 20); // return backtrace_symbols(buffer, *size); // } +#ifdef USE_ADDR2LINE + +#include "osThread.h" +#include "libdwarf.h" +#include "dwarf.h" + +#define DW_PR_DUu "llu" + +typedef struct lookup_table +{ + Dwarf_Line *table; + Dwarf_Line_Context *ctxts; + int cnt; + Dwarf_Addr low; + Dwarf_Addr high; +} lookup_tableT; + +extern int create_lookup_table(Dwarf_Debug dbg, lookup_tableT *lookup_table); +extern void delete_lookup_table(lookup_tableT *lookup_table); + +size_t addr = 0; +lookup_tableT lookup_table; +Dwarf_Debug tDbg; +static TdThreadOnce traceThreadInit = PTHREAD_ONCE_INIT; + +void endTrace() { + delete_lookup_table(&lookup_table); + dwarf_finish(tDbg); +} +void startTrace() { + int ret; + Dwarf_Ptr errarg = 0; + + FILE *fp = fopen("/proc/self/maps", "r"); + fscanf(fp, "%lx-", &addr); + fclose(fp); + + ret = dwarf_init_path("/proc/self/exe", NULL, 0, DW_GROUPNUMBER_ANY, NULL, errarg, &tDbg, NULL); + if (ret == DW_DLV_NO_ENTRY) { + printf("Unable to open file"); + return; + } + + ret = create_lookup_table(tDbg, &lookup_table); + if (ret != DW_DLV_OK) { + printf("Unable to create lookup table"); + return; + } + atexit(endTrace); +} +static void print_line(Dwarf_Debug dbg, Dwarf_Line line, Dwarf_Addr pc) { + char *linesrc = "??"; + Dwarf_Unsigned lineno = 0; + + if (line) { + dwarf_linesrc(line, &linesrc, NULL); + dwarf_lineno(line, &lineno, NULL); + } + printf("%s:%" DW_PR_DUu "\n", linesrc, lineno); + if (line) dwarf_dealloc(dbg, linesrc, DW_DLA_STRING); +} +void taosPrintBackTrace() { + int size = 20; + void **buffer[20]; + Dwarf_Addr pc; + int32_t frame = 0; + void **ebp; + void **ret = NULL; + size_t func_frame_distance = 0; + + taosThreadOnce(&traceThreadInit, startTrace); + + if (buffer != NULL && size > 0) { + ebp = taosGetEbp(); + func_frame_distance = (size_t)*ebp - (size_t)ebp; + while (ebp && frame < size && (func_frame_distance < (1ULL << 24)) && (func_frame_distance > 0)) { + ret = ebp + 1; + buffer[frame++] = *ret; + ebp = (void **)(*ebp); + func_frame_distance = (size_t)*ebp - (size_t)ebp; + } + for (size_t i = 0; i < frame; i++) { + pc = (size_t)buffer[i] - addr; + if (pc > 0) { + if (pc >= lookup_table.low && pc < lookup_table.high) { + Dwarf_Line line = lookup_table.table[pc - lookup_table.low]; + if (line) print_line(tDbg, line, pc); + } + } + } + } +} +#endif +#endif +#endif + +#ifndef USE_ADDR2LINE +void taosPrintBackTrace() { return; } #endif void *taosMemoryMalloc(int32_t size) { From 42204611ba9df581a52b192960877cf42f839c62 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Wed, 11 May 2022 02:00:09 +0800 Subject: [PATCH 06/10] fix(os): add print trace func. --- source/os/src/osMemory.c | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index b1b91699a6..3400f8c516 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -103,8 +103,10 @@ Dwarf_Debug tDbg; static TdThreadOnce traceThreadInit = PTHREAD_ONCE_INIT; void endTrace() { + if (traceThreadInit != PTHREAD_ONCE_INIT) { delete_lookup_table(&lookup_table); dwarf_finish(tDbg); + } } void startTrace() { int ret; @@ -128,15 +130,15 @@ void startTrace() { atexit(endTrace); } static void print_line(Dwarf_Debug dbg, Dwarf_Line line, Dwarf_Addr pc) { - char *linesrc = "??"; - Dwarf_Unsigned lineno = 0; + char *linesrc = "??"; + Dwarf_Unsigned lineno = 0; - if (line) { - dwarf_linesrc(line, &linesrc, NULL); - dwarf_lineno(line, &lineno, NULL); - } - printf("%s:%" DW_PR_DUu "\n", linesrc, lineno); - if (line) dwarf_dealloc(dbg, linesrc, DW_DLA_STRING); + if (line) { + dwarf_linesrc(line, &linesrc, NULL); + dwarf_lineno(line, &lineno, NULL); + } + printf("%s:%" DW_PR_DUu "\n", linesrc, lineno); + if (line) dwarf_dealloc(dbg, linesrc, DW_DLA_STRING); } void taosPrintBackTrace() { int size = 20; From 15dd2721541109156c7e2a9168da9e771cf34d06 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 11 May 2022 02:58:17 +0800 Subject: [PATCH 07/10] enh(tmq): do not show closed consumer --- include/common/tmsg.h | 2 +- source/client/src/tmq.c | 20 ++++++++++++-------- source/dnode/mnode/impl/inc/mndConsumer.h | 1 + source/dnode/mnode/impl/src/mndConsumer.c | 11 ++++++++--- tests/test/c/tmqSim.c | 13 +++++++------ 5 files changed, 29 insertions(+), 18 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 909b7d0877..ff2e419c75 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1493,7 +1493,7 @@ typedef struct { } SMVSubscribeRsp; typedef struct { - char name[TSDB_TABLE_FNAME_LEN]; + char name[TSDB_TOPIC_FNAME_LEN]; int8_t igNotExists; } SMDropTopicReq; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 0c73e000a8..0ce689f19c 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1307,15 +1307,19 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { } tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { - tmq_list_t* lst = tmq_list_new(); - tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); - tmq_list_destroy(lst); - if (rsp == TMQ_RESP_ERR__SUCCESS) { - // TODO: free resources - return TMQ_RESP_ERR__SUCCESS; - } else { - return TMQ_RESP_ERR__FAIL; + if (tmq->status == TMQ_CONSUMER_STATUS__READY) { + tmq_list_t* lst = tmq_list_new(); + tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); + tmq_list_destroy(lst); + if (rsp == TMQ_RESP_ERR__SUCCESS) { + // TODO: free resources + return TMQ_RESP_ERR__SUCCESS; + } else { + return TMQ_RESP_ERR__FAIL; + } } + // TODO: free resources + return TMQ_RESP_ERR__SUCCESS; } const char* tmq_err2str(tmq_resp_err_t err) { diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index a8bfe91cbf..210e336ac2 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -29,6 +29,7 @@ enum { MQ_CONSUMER_STATUS__LOST, MQ_CONSUMER_STATUS__LOST_IN_REB, MQ_CONSUMER_STATUS__LOST_REBD, + MQ_CONSUMER_STATUS__REMOVED, }; int32_t mndInitConsumer(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 155ea6ae93..9c8c6d32eb 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -486,6 +486,14 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { } } + if (pConsumerOld && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && + taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { + /*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/ + /*pConsumerNew->updateType = */ + /*}*/ + goto SUBSCRIBE_OVER; + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg); if (pTrans == NULL) goto SUBSCRIBE_OVER; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER; @@ -684,9 +692,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY || pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) { pOldConsumer->status = MQ_CONSUMER_STATUS__READY; - // TODO: remove - /*if (taosArrayGetSize(pOldConsumer->assignedTopics) == 0) {*/ - /*}*/ } else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB || pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) { pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 73cdf7f59c..4a59d18d87 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -331,12 +331,6 @@ void loop_consume(SThreadInfo* pInfo) { } } - err = tmq_consumer_close(pInfo->tmq); - if (err) { - printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); - exit(-1); - } - pInfo->consumeMsgCnt = totalMsgs; pInfo->consumeRowCnt = totalRows; @@ -372,6 +366,13 @@ void* consumeThreadFunc(void* param) { return NULL; } + err = tmq_consumer_close(pInfo->tmq); + if (err) { + printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); + exit(-1); + } + pInfo->tmq = NULL; + // save consume result into consumeresult table saveConsumeResult(pInfo); From 7e093163dceacd27f21a52fde328a9377f67ceea Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 11 May 2022 07:43:14 +0800 Subject: [PATCH 08/10] fix: commit table in mem and file --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 76 +++++++++++++++++++++++- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +- source/dnode/vnode/src/tsdb/tsdbSma.c | 1 - 3 files changed, 78 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 1315963090..7429d74dad 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -70,6 +70,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static void tsdbResetCommitFile(SCommitH *pCommith); static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbCommitToTable(SCommitH *pCommith, int tid); +static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx); static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); static int tsdbComparKeyBlock(const void *arg1, const void *arg2); static int tsdbWriteBlockInfo(SCommitH *pCommih); @@ -349,7 +350,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) { return -1; } - +#if 0 // Loop to commit each table data for (int tid = 0; tid < pCommith->niters; tid++) { SCommitIter *pIter = pCommith->iters + tid; @@ -363,6 +364,46 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { return -1; } } +#endif + // Loop to commit each table data in mem and file + int mIter = 0, fIter = 0; + int32_t nBlkIdx = taosArrayGetSize(pCommith->readh.aBlkIdx); + + while (true) { + SBlockIdx *pIdx = NULL; + SCommitIter *pIter = NULL; + if (mIter < pCommith->niters) { + pIter = pCommith->iters + mIter; + if (fIter < nBlkIdx) { + pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter); + } + } else if (fIter < nBlkIdx) { + pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter); + } else { + break; + } + if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->uid <= pIdx->uid))) { + if (tsdbCommitToTable(pCommith, mIter) < 0) { + tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); + return -1; + } + + if (pIdx && (pIter->pTable->uid == pIdx->uid)) { + ++fIter; + } + ++mIter; + } else if (pIdx) { + if (tsdbMoveBlkIdx(pCommith, pIdx) < 0) { + tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); + return -1; + } + ++fIter; + } + } if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) < 0) { @@ -838,6 +879,39 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { return 0; } +static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { + SReadH *pReadh = &pCommith->readh; + int nBlocks = pIdx->numOfBlocks; + int bidx = 0; + SBlock *pBlock; + + tsdbResetCommitTable(pCommith); + + pReadh->pBlkIdx = pIdx; + + if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { + return -1; + } + + while (bidx < nBlocks) { + if (tsdbMoveBlock(pCommith, bidx) < 0) { + return -1; + } + ++bidx; + } + + STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL}; + TSDB_COMMIT_TABLE(pCommith) = &table; + + if (tsdbWriteBlockInfo(pCommith) < 0) { + tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), + TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); + return -1; + } + + return 0; +} + static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 60f2f74f5b..4638066935 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -372,13 +372,13 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, } if (level == TSDB_RETENTION_L0) { - tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level); + tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L0); return VND_RSMA0(pVnode); } else if (level == TSDB_RETENTION_L1) { - tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level); + tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L1); return VND_RSMA1(pVnode); } else { - tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level); + tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L2); return VND_RSMA2(pVnode); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 32051c2de4..61515a3be1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -1943,7 +1943,6 @@ static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) { if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) { - tsdbDebug("vgId:%d no need to update tbUids since empty uidStore", REPO_ID(pTsdb)); return TSDB_CODE_SUCCESS; } From a8e91544252970b6de43239aaa6695e51d0084c2 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 11 May 2022 07:48:45 +0800 Subject: [PATCH 09/10] enh: format optimization --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 7429d74dad..26b1dc7274 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -366,8 +366,8 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { } #endif // Loop to commit each table data in mem and file - int mIter = 0, fIter = 0; - int32_t nBlkIdx = taosArrayGetSize(pCommith->readh.aBlkIdx); + int mIter = 0, fIter = 0; + int nBlkIdx = taosArrayGetSize(pCommith->readh.aBlkIdx); while (true) { SBlockIdx *pIdx = NULL; From 8cb2edb25ca85b835a93db43b0a0a35d058f2009 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 11 May 2022 07:55:01 +0800 Subject: [PATCH 10/10] enh: code optimization --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 26b1dc7274..5f54e0cb48 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -883,7 +883,6 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { SReadH *pReadh = &pCommith->readh; int nBlocks = pIdx->numOfBlocks; int bidx = 0; - SBlock *pBlock; tsdbResetCommitTable(pCommith); @@ -895,6 +894,8 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { while (bidx < nBlocks) { if (tsdbMoveBlock(pCommith, bidx) < 0) { + tsdbError("vgId:%d failed to move block into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), + TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); return -1; } ++bidx;