refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-01-07 00:59:05 +08:00
parent f408c795da
commit 972f9b6948
5 changed files with 58 additions and 27 deletions

View File

@ -46,6 +46,8 @@ char *paGetToken(char *src, char **token, int32_t *tokenLen);
int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]);
int32_t taosHexStrToByteArray(char hexstr[], char bytes[]);
int32_t tintToHex(uint64_t val, char hex[]);
char *taosIpStr(uint32_t ipInt);
uint32_t ip2uint(const char *const ip_addr);
void taosIp2String(uint32_t ip, char *str);

View File

@ -32,7 +32,7 @@ typedef struct SMetaStbStatsEntry {
} SMetaStbStatsEntry;
typedef struct STagFilterResEntry {
uint64_t suid; // uid for super table
// uint64_t suid; // uid for super table
SList list; // the linked list of md5 digest, extracted from the serialized tag query condition
uint32_t qTimes; // queried times for current super table
} STagFilterResEntry;
@ -533,6 +533,11 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
uint64_t buf[3] = {0};
buf[0] = suid;
memcpy(&buf[1], pKey, keyLen);
ASSERT(sizeof(uint64_t) + keyLen == 24);
taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
@ -543,15 +548,24 @@ int32_t metaUidFilterCachePut(SMeta* pMeta, uint64_t suid, const void* pKey, int
taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
tdListAppend(&p->list, pKey);
} else {
// check if it exists or not
SListIter iter = {0};
tdListInitIter(&(*pEntry)->list, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
uint64_t* p = (uint64_t*) pNode->data;
// key already exists in cache, quit
if (p[1] == ((uint64_t*)pKey)[1] && p[2] == ((uint64_t*)pKey)[2]) {
taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
}
tdListAppend(&(*pEntry)->list, pKey);
}
uint64_t buf[3] = {0};
buf[0] = suid;
memcpy(&buf[1], pKey, keyLen);
ASSERT(sizeof(uint64_t) + keyLen == 24);
// add to cache.
taosLRUCacheInsert(pCache, buf, sizeof(uint64_t) + keyLen, pPayload, payloadLen, freePayload, NULL,
TAOS_LRU_PRIORITY_LOW);

View File

@ -1962,6 +1962,22 @@ void destroyAggOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
static char* buildTaskId(uint64_t taskId, uint64_t queryId) {
char* p = taosMemoryMalloc(64);
int32_t offset = 6;
memcpy(p, "TID:0x", offset);
offset += tintToHex(taskId, &p[offset]);
memcpy(&p[offset], " QID:0x", 7);
offset += 7;
offset += tintToHex(queryId, &p[offset]);
p[offset] = 0;
return p;
}
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
if (pTaskInfo == NULL) {
@ -1978,10 +1994,9 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
char* p = taosMemoryCalloc(1, 128);
snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
pTaskInfo->id.str = p;
// char* p = taosMemoryMalloc(64);
// snprintf(p, 64, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
pTaskInfo->id.str = buildTaskId(taskId, queryId);
return pTaskInfo;
}

View File

@ -284,7 +284,6 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
int64_t* p = (int64_t*) output;
if (selector == 0 || selector == 1) {
#if 1
int32_t gRemainder = nelements - count;
int32_t num = gRemainder > elems? elems:gRemainder;
@ -302,14 +301,8 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
}
count += num;
#else
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
p[_pos++] = prev_value;
}
#endif
} else {
int32_t gRemainder = (nelements - count);
int32_t num = gRemainder > elems? elems:gRemainder;
int32_t batch = num >> 2;
@ -369,10 +362,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
int32_t* p = (int32_t*) output;
if (selector == 0 || selector == 1) {
zigzag_value = 0;
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int32_t)prev_value;
}
} else {
@ -389,10 +379,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
int16_t* p = (int16_t*) output;
if (selector == 0 || selector == 1) {
zigzag_value = 0;
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int16_t)prev_value;
}
} else {
@ -410,10 +397,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
int8_t *p = (int8_t *)output;
if (selector == 0 || selector == 1) {
zigzag_value = 0;
for (int32_t i = 0; i < elems && count < nelements; i++, count++) {
prev_value += ZIGZAG_DECODE(int64_t, zigzag_value);
p[_pos++] = (int8_t)prev_value;
}
} else {

View File

@ -320,6 +320,22 @@ char *strbetween(char *string, char *begin, char *end) {
return result;
}
int32_t tintToHex(uint64_t val, char hex[]) {
const char hexstr[16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
int32_t j = 0;
int32_t k = 0;
while((val & (((uint64_t)0xfL) << ((15 - k) * 4))) == 0) {
k += 1;
}
for (j = 0; k < 16; ++k, ++j) {
hex[j] = hexstr[(val & (((uint64_t)0xfL) << ((15 - k) * 4))) >> (15 - k) * 4];
}
return j;
}
int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]) {
int32_t i;
char hexval[16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};