Merge pull request #12910 from taosdata/fix/indexMultThread

enh: opt index mutex
This commit is contained in:
Yihao Deng 2022-05-27 13:33:29 +08:00 committed by GitHub
commit abcb0f5f91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 114 additions and 89 deletions

View File

@ -62,6 +62,7 @@ extern int32_t fsDebugFlag;
extern int32_t metaDebugFlag; extern int32_t metaDebugFlag;
extern int32_t fnDebugFlag; extern int32_t fnDebugFlag;
extern int32_t smaDebugFlag; extern int32_t smaDebugFlag;
extern int32_t idxDebugFlag;
int32_t taosInitLog(const char *logName, int32_t maxFiles); int32_t taosInitLog(const char *logName, int32_t maxFiles);
void taosCloseLog(); void taosCloseLog();

View File

@ -81,7 +81,8 @@ uint16_t tsTelemPort = 80;
char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null"; char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null";
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value. char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value.
// If set to empty system will generate table name using MD5 hash. // If set to empty system will generate table name using MD5 hash.
bool tsSmlDataFormat = true; // true means that the name and order of cols in each line are the same(only for influx protocol) bool tsSmlDataFormat =
true; // true means that the name and order of cols in each line are the same(only for influx protocol)
// query // query
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
@ -292,6 +293,7 @@ int32_t taosAddClientLogCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, 1) != 0) return -1; if (cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "simDebugFlag", 143, 0, 255, 1) != 0) return -1; if (cfgAddInt32(pCfg, "simDebugFlag", 143, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "debugFlag", 0, 0, 255, 1) != 0) return -1; if (cfgAddInt32(pCfg, "debugFlag", 0, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "idxDebugFlag", 0, 0, 255, 1) != 0) return -1;
return 0; return 0;
} }
@ -307,6 +309,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "fnDebugFlag", fnDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "fnDebugFlag", fnDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, 0) != 0) return -1;
return 0; return 0;
} }
@ -479,6 +482,7 @@ static void taosSetClientLogCfg(SConfig *pCfg) {
rpcDebugFlag = cfgGetItem(pCfg, "rpcDebugFlag")->i32; rpcDebugFlag = cfgGetItem(pCfg, "rpcDebugFlag")->i32;
tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32; tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32;
jniDebugFlag = cfgGetItem(pCfg, "jniDebugFlag")->i32; jniDebugFlag = cfgGetItem(pCfg, "jniDebugFlag")->i32;
idxDebugFlag = cfgGetItem(pCfg, "idxDebugFlag")->i32;
} }
static void taosSetServerLogCfg(SConfig *pCfg) { static void taosSetServerLogCfg(SConfig *pCfg) {
@ -493,6 +497,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
fsDebugFlag = cfgGetItem(pCfg, "fsDebugFlag")->i32; fsDebugFlag = cfgGetItem(pCfg, "fsDebugFlag")->i32;
fnDebugFlag = cfgGetItem(pCfg, "fnDebugFlag")->i32; fnDebugFlag = cfgGetItem(pCfg, "fnDebugFlag")->i32;
smaDebugFlag = cfgGetItem(pCfg, "smaDebugFlag")->i32; smaDebugFlag = cfgGetItem(pCfg, "smaDebugFlag")->i32;
idxDebugFlag = cfgGetItem(pCfg, "idxDebugFlag")->i32;
} }
static int32_t taosSetClientCfg(SConfig *pCfg) { static int32_t taosSetClientCfg(SConfig *pCfg) {

View File

@ -38,7 +38,7 @@ typedef struct IndexCache {
MemTable *mem, *imm; MemTable *mem, *imm;
SIndex* index; SIndex* index;
char* colName; char* colName;
int32_t version; int64_t version;
int64_t occupiedMem; int64_t occupiedMem;
int8_t type; int8_t type;
uint64_t suid; uint64_t suid;
@ -47,12 +47,12 @@ typedef struct IndexCache {
TdThreadCond finished; TdThreadCond finished;
} IndexCache; } IndexCache;
#define CACHE_VERSION(cache) atomic_load_32(&cache->version) #define CACHE_VERSION(cache) atomic_load_64(&cache->version)
typedef struct CacheTerm { typedef struct CacheTerm {
// key // key
char* colVal; char* colVal;
int32_t version; int64_t version;
// value // value
uint64_t uid; uint64_t uid;
int8_t colType; int8_t colType;

View File

@ -34,6 +34,15 @@
extern "C" { extern "C" {
#endif #endif
// clang-format off
#define indexFatal(...) do { if (idxDebugFlag & DEBUG_FATAL) { taosPrintLog("INDEX FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0)
#define indexError(...) do { if (idxDebugFlag & DEBUG_ERROR) { taosPrintLog("INDEX ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0)
#define indexWarn(...) do { if (idxDebugFlag & DEBUG_WARN) { taosPrintLog("INDEX WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0)
#define indexInfo(...) do { if (idxDebugFlag & DEBUG_INFO) { taosPrintLog("INDEX ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0)
#define indexDebug(...) do { if (idxDebugFlag & DEBUG_DEBUG) { taosPrintLog("INDEX ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__);} } while (0)
#define indexTrace(...) do { if (idxDebugFlag & DEBUG_TRACE) { taosPrintLog("INDEX ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__);} } while (0)
// clang-format on
typedef enum { LT, LE, GT, GE } RangeType; typedef enum { LT, LE, GT, GE } RangeType;
typedef enum { kTypeValue, kTypeDeletion } STermValueType; typedef enum { kTypeValue, kTypeDeletion } STermValueType;
@ -134,15 +143,6 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf);
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// clang-format off
#define indexFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("INDEX FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0)
#define indexError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("INDEX ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0)
#define indexWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("INDEX WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0)
#define indexInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("INDEX ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0)
#define indexDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("INDEX ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__);} } while (0)
#define indexTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("INDEX ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__);} } while (0)
// clang-format on
#define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0) #define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F) #define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F)

View File

@ -28,12 +28,12 @@ extern "C" {
// tfile header content // tfile header content
// |<---suid--->|<---version--->|<-------colName------>|<---type-->|<--fstOffset->| // |<---suid--->|<---version--->|<-------colName------>|<---type-->|<--fstOffset->|
// |<-uint64_t->|<---int32_t--->|<--TSDB_COL_NAME_LEN-->|<-uint8_t->|<---int32_t-->| // |<-uint64_t->|<---int64_t--->|<--TSDB_COL_NAME_LEN-->|<-uint8_t->|<---int32_t-->|
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct TFileHeader { typedef struct TFileHeader {
uint64_t suid; uint64_t suid;
int32_t version; int64_t version;
char colName[TSDB_COL_NAME_LEN]; // char colName[TSDB_COL_NAME_LEN]; //
uint8_t colType; uint8_t colType;
int32_t fstOffset; int32_t fstOffset;
@ -77,6 +77,7 @@ typedef struct IndexTFile {
char* path; char* path;
TFileCache* cache; TFileCache* cache;
TFileWriter* tw; TFileWriter* tw;
TdThreadMutex mtx;
} IndexTFile; } IndexTFile;
typedef struct TFileWriterOpt { typedef struct TFileWriterOpt {
@ -101,14 +102,14 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* read
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName); TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName); TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName);
TFileReader* tfileReaderCreate(WriterCtx* ctx); TFileReader* tfileReaderCreate(WriterCtx* ctx);
void tfileReaderDestroy(TFileReader* reader); void tfileReaderDestroy(TFileReader* reader);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr); int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr);
void tfileReaderRef(TFileReader* reader); void tfileReaderRef(TFileReader* reader);
void tfileReaderUnRef(TFileReader* reader); void tfileReaderUnRef(TFileReader* reader);
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t type); TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type);
void tfileWriterClose(TFileWriter* tw); void tfileWriterClose(TFileWriter* tw);
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header); TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header);
void tfileWriterDestroy(TFileWriter* tw); void tfileWriterDestroy(TFileWriter* tw);

View File

@ -557,20 +557,18 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) { static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) {
ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)}; ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
int64_t ver = CACHE_VERSION(cache); int64_t ver = CACHE_VERSION(cache);
taosThreadMutexLock(&sIdx->mtx);
TFileReader* trd = tfileCacheGet(((IndexTFile*)sIdx->tindex)->cache, &key); IndexTFile* tf = (IndexTFile*)(sIdx->tindex);
if (trd != NULL) {
if (ver < trd->header.version) { taosThreadMutexLock(&tf->mtx);
ver = trd->header.version + 1; TFileReader* rd = tfileCacheGet(tf->cache, &key);
} else { taosThreadMutexUnlock(&tf->mtx);
ver += 1;
if (rd != NULL) {
ver = (ver > rd->header.version ? ver : rd->header.version) + 1;
indexInfo("header: %" PRId64 ", ver: %" PRId64 "", rd->header.version, ver);
} }
indexInfo("header: %d, ver: %" PRId64 "", trd->header.version, ver); tfileReaderUnRef(rd);
tfileReaderUnRef(trd);
} else {
indexInfo("not found reader base %p", trd);
}
taosThreadMutexUnlock(&sIdx->mtx);
return ver; return ver;
} }
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
@ -597,13 +595,14 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
} }
indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf); indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
IndexTFile* tf = (IndexTFile*)sIdx->tindex;
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)}; ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
taosThreadMutexLock(&sIdx->mtx); taosThreadMutexLock(&tf->mtx);
IndexTFile* ifile = (IndexTFile*)sIdx->tindex; tfileCachePut(tf->cache, &key, reader);
tfileCachePut(ifile->cache, &key, reader); taosThreadMutexUnlock(&tf->mtx);
taosThreadMutexUnlock(&sIdx->mtx);
return ret; return ret;
END: END:
if (tw != NULL) { if (tw != NULL) {

View File

@ -80,7 +80,7 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal; pCt->colVal = term->colVal;
pCt->version = atomic_load_32(&pCache->version); pCt->version = atomic_load_64(&pCache->version);
char* key = indexCacheTermGet(pCt); char* key = indexCacheTermGet(pCt);
@ -133,7 +133,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal; pCt->colVal = term->colVal;
pCt->version = atomic_load_32(&pCache->version); pCt->version = atomic_load_64(&pCache->version);
char* key = indexCacheTermGet(pCt); char* key = indexCacheTermGet(pCt);
@ -185,7 +185,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResul
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal; pCt->colVal = term->colVal;
pCt->version = atomic_load_32(&pCache->version); pCt->version = atomic_load_64(&pCache->version);
char* exBuf = NULL; char* exBuf = NULL;
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) { if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
@ -259,7 +259,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal; pCt->colVal = term->colVal;
pCt->version = atomic_load_32(&pCache->version); pCt->version = atomic_load_64(&pCache->version);
int8_t dType = INDEX_TYPE_GET_TYPE(term->colType); int8_t dType = INDEX_TYPE_GET_TYPE(term->colType);
int skip = 0; int skip = 0;
@ -356,7 +356,7 @@ void indexCacheDebug(IndexCache* cache) {
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
if (ct != NULL) { if (ct != NULL) {
// TODO, add more debug info // TODO, add more debug info
indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version); indexInfo("{colVal: %s, version: %" PRId64 "} \t", ct->colVal, ct->version);
} }
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
@ -377,7 +377,7 @@ void indexCacheDebug(IndexCache* cache) {
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
if (ct != NULL) { if (ct != NULL) {
// TODO, add more debug info // TODO, add more debug info
indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version); indexInfo("{colVal: %s, version: %" PRId64 "} \t", ct->colVal, ct->version);
} }
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
@ -529,7 +529,7 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
ct->colVal = (char*)taosMemoryCalloc(1, sizeof(char) * (term->nColVal + 1)); ct->colVal = (char*)taosMemoryCalloc(1, sizeof(char) * (term->nColVal + 1));
memcpy(ct->colVal, term->colVal, term->nColVal); memcpy(ct->colVal, term->colVal, term->nColVal);
} }
ct->version = atomic_add_fetch_32(&pCache->version, 1); ct->version = atomic_add_fetch_64(&pCache->version, 1);
// set value // set value
ct->uid = uid; ct->uid = uid;
ct->operaType = term->operType; ct->operaType = term->operType;
@ -663,7 +663,11 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) {
// compare colVal // compare colVal
int32_t cmp = strcmp(lt->colVal, rt->colVal); int32_t cmp = strcmp(lt->colVal, rt->colVal);
if (cmp == 0) { if (cmp == 0) {
return rt->version - lt->version; if (rt->version == lt->version) {
cmp = 0;
} else {
cmp = rt->version < lt->version ? -1 : 1;
}
} }
return cmp; return cmp;
} }

View File

@ -54,9 +54,9 @@ static SArray* tfileGetFileList(const char* path);
static int tfileRmExpireFile(SArray* result); static int tfileRmExpireFile(SArray* result);
static void tfileDestroyFileName(void* elem); static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b); static int tfileCompare(const void* a, const void* b);
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version); static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int64_t* version);
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version); static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int64_t version);
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version); static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version);
/* /*
* search from tfile * search from tfile
*/ */
@ -151,13 +151,10 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
char buf[128] = {0}; char buf[128] = {0};
int32_t sz = indexSerialCacheKey(key, buf); int32_t sz = indexSerialCacheKey(key, buf);
assert(sz < sizeof(buf)); assert(sz < sizeof(buf));
indexInfo("Try to get key: %s", buf);
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz); TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
if (reader == NULL || *reader == NULL) { if (reader == NULL || *reader == NULL) {
indexInfo("failed to get key: %s", buf);
return NULL; return NULL;
} }
indexInfo("Get key: %s file: %s", buf, (*reader)->ctx->file.buf);
tfileReaderRef(*reader); tfileReaderRef(*reader);
return *reader; return *reader;
@ -168,11 +165,11 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
// remove last version index reader // remove last version index reader
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz); TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
if (p != NULL && *p != NULL) { if (p != NULL && *p != NULL) {
TFileReader* oldReader = *p; TFileReader* oldRdr = *p;
taosHashRemove(tcache->tableCache, buf, sz); taosHashRemove(tcache->tableCache, buf, sz);
indexInfo("found %s, remove file %s", buf, oldReader->ctx->file.buf); indexInfo("found %s, should remove file %s", buf, oldRdr->ctx->file.buf);
oldReader->remove = true; oldRdr->remove = true;
tfileReaderUnRef(oldReader); tfileReaderUnRef(oldRdr);
} }
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
tfileReaderRef(reader); tfileReaderRef(reader);
@ -215,6 +212,12 @@ void tfileReaderDestroy(TFileReader* reader) {
// T_REF_INC(reader); // T_REF_INC(reader);
fstDestroy(reader->fst); fstDestroy(reader->fst);
writerCtxDestroy(reader->ctx, reader->remove); writerCtxDestroy(reader->ctx, reader->remove);
if (reader->remove) {
indexInfo("%s is removed", reader->ctx->file.buf);
} else {
indexInfo("%s is not removed", reader->ctx->file.buf);
}
taosMemoryFree(reader); taosMemoryFree(reader);
} }
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
@ -512,7 +515,7 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResul
return ret; return ret;
} }
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) { TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType) {
char fullname[256] = {0}; char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version); tfileGenFileFullName(fullname, path, suid, colName, version);
// indexInfo("open write file name %s", fullname); // indexInfo("open write file name %s", fullname);
@ -529,7 +532,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c
return tfileWriterCreate(wcx, &tfh); return tfileWriterCreate(wcx, &tfh);
} }
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName) { TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName) {
char fullname[256] = {0}; char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version); tfileGenFileFullName(fullname, path, suid, colName, version);
@ -657,7 +660,7 @@ IndexTFile* indexTFileCreate(const char* path) {
tfileCacheDestroy(cache); tfileCacheDestroy(cache);
return NULL; return NULL;
} }
taosThreadMutexInit(&tfile->mtx, NULL);
tfile->cache = cache; tfile->cache = cache;
return tfile; return tfile;
} }
@ -665,6 +668,7 @@ void indexTFileDestroy(IndexTFile* tfile) {
if (tfile == NULL) { if (tfile == NULL) {
return; return;
} }
taosThreadMutexDestroy(&tfile->mtx);
tfileCacheDestroy(tfile->cache); tfileCacheDestroy(tfile->cache);
taosMemoryFree(tfile); taosMemoryFree(tfile);
} }
@ -680,7 +684,10 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName}; ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
taosThreadMutexLock(&pTfile->mtx);
TFileReader* reader = tfileCacheGet(pTfile->cache, &key); TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
taosThreadMutexUnlock(&pTfile->mtx);
if (reader == NULL) { if (reader == NULL) {
return 0; return 0;
} }
@ -780,8 +787,13 @@ TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) {
if (tf == NULL) { if (tf == NULL) {
return NULL; return NULL;
} }
TFileReader* rd = NULL;
ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
return tfileCacheGet(tf->cache, &key);
taosThreadMutexLock(&tf->mtx);
rd = tfileCacheGet(tf->cache, &key);
taosThreadMutexUnlock(&tf->mtx);
return rd;
} }
static int tfileUidCompare(const void* a, const void* b) { static int tfileUidCompare(const void* a, const void* b) {
@ -1013,7 +1025,7 @@ void tfileReaderUnRef(TFileReader* reader) {
static SArray* tfileGetFileList(const char* path) { static SArray* tfileGetFileList(const char* path) {
char buf[128] = {0}; char buf[128] = {0};
uint64_t suid; uint64_t suid;
uint32_t version; int64_t version;
SArray* files = taosArrayInit(4, sizeof(void*)); SArray* files = taosArrayInit(4, sizeof(void*));
TdDirPtr pDir = taosOpenDir(path); TdDirPtr pDir = taosOpenDir(path);
@ -1053,19 +1065,19 @@ static int tfileCompare(const void* a, const void* b) {
return strcmp(as, bs); return strcmp(as, bs);
} }
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version) { static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int64_t* version) {
if (3 == sscanf(filename, "%" PRIu64 "-%[^-]-%d.tindex", suid, col, version)) { if (3 == sscanf(filename, "%" PRIu64 "-%[^-]-%" PRId64 ".tindex", suid, col, version)) {
// read suid & colid & version success // read suid & colid & version success
return 0; return 0;
} }
return -1; return -1;
} }
// tfile name suid-colId-version.tindex // tfile name suid-colId-version.tindex
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version) { static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int64_t version) {
sprintf(filename, "%" PRIu64 "-%s-%d.tindex", suid, col, version); sprintf(filename, "%" PRIu64 "-%s-%" PRId64 ".tindex", suid, col, version);
return; return;
} }
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version) { static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version) {
char filename[128] = {0}; char filename[128] = {0};
tfileGenFileName(filename, suid, col, version); tfileGenFileName(filename, suid, col, version);
sprintf(fullname, "%s/%s", path, filename); sprintf(fullname, "%s/%s", path, filename);

View File

@ -279,7 +279,7 @@ static void initLog() {
const int32_t maxLogFileNum = 10; const int32_t maxLogFileNum = 10;
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = 143; idxDebugFlag = 143;
strcpy(tsLogDir, logDir.c_str()); strcpy(tsLogDir, logDir.c_str());
taosRemoveDir(tsLogDir); taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir); taosMkDir(tsLogDir);
@ -387,7 +387,7 @@ class TFileObj {
std::string path(path_); std::string path(path_);
int colId = 2; int colId = 2;
char buf[64] = {0}; char buf[64] = {0};
sprintf(buf, "%" PRIu64 "-%d-%d.tindex", header.suid, colId_, header.version); sprintf(buf, "%" PRIu64 "-%d-%" PRId64 ".tindex", header.suid, colId_, header.version);
path.append("/").append(buf); path.append("/").append(buf);
fileName_ = path; fileName_ = path;
@ -794,10 +794,10 @@ class IndexObj {
} }
int sz = taosArrayGetSize(result); int sz = taosArrayGetSize(result);
indexMultiTermQueryDestroy(mq); indexMultiTermQueryDestroy(mq);
taosArrayDestroy(result);
assert(sz == 1); assert(sz == 1);
uint64_t* ret = (uint64_t*)taosArrayGet(result, 0); uint64_t* ret = (uint64_t*)taosArrayGet(result, 0);
assert(val = *ret); assert(val = *ret);
taosArrayDestroy(result);
return sz; return sz;
} }
@ -953,8 +953,8 @@ TEST_F(IndexEnv2, testIndex_TrigeFlush) {
} }
static void single_write_and_search(IndexObj* idx) { static void single_write_and_search(IndexObj* idx) {
int target = idx->SearchOne("tag1", "Hello"); // int target = idx->SearchOne("tag1", "Hello");
target = idx->SearchOne("tag2", "Test"); // target = idx->SearchOne("tag2", "Test");
} }
static void multi_write_and_search(IndexObj* idx) { static void multi_write_and_search(IndexObj* idx) {
idx->PutOne("tag1", "Hello"); idx->PutOne("tag1", "Hello");

View File

@ -24,11 +24,7 @@
#pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare" #pragma GCC diagnostic ignored "-Wsign-compare"
#include "executor.h" #include "index.h"
#include "executorimpl.h"
#include "indexoperator.h"
#include "os.h"
#include "stub.h" #include "stub.h"
#include "taos.h" #include "taos.h"
#include "tcompare.h" #include "tcompare.h"

View File

@ -24,7 +24,7 @@ static void initLog() {
const int32_t maxLogFileNum = 10; const int32_t maxLogFileNum = 10;
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = 143; idxDebugFlag = 143;
strcpy(tsLogDir, logDir.c_str()); strcpy(tsLogDir, logDir.c_str());
taosRemoveDir(tsLogDir); taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir); taosMkDir(tsLogDir);

View File

@ -27,6 +27,14 @@ void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHan
void (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle}; void (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle};
static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) {
*ip = taosGetIpv4FromFqdn(localFqdn);
if (*ip == 0xFFFFFFFF) {
terrno = TSDB_CODE_RPC_FQDN_ERROR;
return -1;
}
return 0;
}
void* rpcOpen(const SRpcInit* pInit) { void* rpcOpen(const SRpcInit* pInit) {
SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo)); SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) { if (pRpc == NULL) {
@ -35,7 +43,6 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->label) { if (pInit->label) {
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label) + 1); tstrncpy(pRpc->label, pInit->label, strlen(pInit->label) + 1);
} }
// register callback handle // register callback handle
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->retry = pInit->rfp; pRpc->retry = pInit->rfp;
@ -48,10 +55,8 @@ void* rpcOpen(const SRpcInit* pInit) {
uint32_t ip = 0; uint32_t ip = 0;
if (pInit->connType == TAOS_CONN_SERVER) { if (pInit->connType == TAOS_CONN_SERVER) {
ip = taosGetIpv4FromFqdn(pInit->localFqdn); if (transValidLocalFqdn(pInit->localFqdn, &ip) != 0) {
if (ip == 0xFFFFFFFF) { tError("invalid fqdn: %s, errmsg: %s", pInit->localFqdn, terrstr());
tError("invalid fqdn: %s", pInit->localFqdn);
terrno = TSDB_CODE_RPC_FQDN_ERROR;
taosMemoryFree(pRpc); taosMemoryFree(pRpc);
return NULL; return NULL;
} }

View File

@ -923,7 +923,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
} }
if (false == taosValidIpAndPort(srv->ip, srv->port)) { if (false == taosValidIpAndPort(srv->ip, srv->port)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
tError("invalid ip/port, reason: %s", terrstr()); tError("invalid ip/port, %d:%d, reason: %s", srv->ip, srv->port, terrstr());
goto End; goto End;
} }
if (false == addHandleToAcceptloop(srv)) { if (false == addHandleToAcceptloop(srv)) {

View File

@ -96,6 +96,7 @@ int32_t fsDebugFlag = 135;
int32_t metaDebugFlag = 135; int32_t metaDebugFlag = 135;
int32_t fnDebugFlag = 135; int32_t fnDebugFlag = 135;
int32_t smaDebugFlag = 135; int32_t smaDebugFlag = 135;
int32_t idxDebugFlag = 135;
int64_t dbgEmptyW = 0; int64_t dbgEmptyW = 0;
int64_t dbgWN = 0; int64_t dbgWN = 0;
@ -759,6 +760,7 @@ void taosSetAllDebugFlag(int32_t flag) {
fsDebugFlag = flag; fsDebugFlag = flag;
fnDebugFlag = flag; fnDebugFlag = flag;
smaDebugFlag = flag; smaDebugFlag = flag;
idxDebugFlag = flag;
uInfo("all debug flag are set to %d", flag); uInfo("all debug flag are set to %d", flag);
} }