commit
a126b12fdb
File diff suppressed because it is too large
Load Diff
|
@ -484,7 +484,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
|
||||||
pSql->res.qhandle = 0x1;
|
pSql->res.qhandle = 0x1;
|
||||||
pSql->res.numOfRows = 0;
|
pSql->res.numOfRows = 0;
|
||||||
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
|
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
|
||||||
taosClearDataCache(tscCacheHandle);
|
taosCacheEmpty(tscCacheHandle);
|
||||||
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
|
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
|
||||||
tscProcessServerVer(pSql);
|
tscProcessServerVer(pSql);
|
||||||
} else if (pCmd->command == TSDB_SQL_CLI_VERSION) {
|
} else if (pCmd->command == TSDB_SQL_CLI_VERSION) {
|
||||||
|
|
|
@ -182,7 +182,6 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pSql->ipList->ip[0] = inet_addr("192.168.0.1");
|
pSql->ipList->ip[0] = inet_addr("192.168.0.1");
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
|
||||||
|
|
||||||
if (pSql->cmd.command < TSDB_SQL_MGMT) {
|
if (pSql->cmd.command < TSDB_SQL_MGMT) {
|
||||||
pSql->ipList->port = tsDnodeShellPort;
|
pSql->ipList->port = tsDnodeShellPort;
|
||||||
|
@ -2641,7 +2640,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
|
||||||
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
|
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
|
||||||
assert(pMeterMetaInfo->pMeterMeta == NULL);
|
assert(pMeterMetaInfo->pMeterMeta == NULL);
|
||||||
|
|
||||||
pMeterMetaInfo->pMeterMeta = (STableMeta *)taosAddDataIntoCache(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta,
|
pMeterMetaInfo->pMeterMeta = (STableMeta *)taosCachePut(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta,
|
||||||
pMeta->contLen, tsMeterMetaKeepTimer);
|
pMeta->contLen, tsMeterMetaKeepTimer);
|
||||||
// todo handle out of memory case
|
// todo handle out of memory case
|
||||||
if (pMeterMetaInfo->pMeterMeta == NULL) return 0;
|
if (pMeterMetaInfo->pMeterMeta == NULL) return 0;
|
||||||
|
@ -2750,7 +2749,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
|
||||||
int32_t size = (int32_t)(rsp - ((char *)pMeta)); // Consistent with STableMeta in cache
|
int32_t size = (int32_t)(rsp - ((char *)pMeta)); // Consistent with STableMeta in cache
|
||||||
|
|
||||||
pMeta->index = 0;
|
pMeta->index = 0;
|
||||||
(void)taosAddDataIntoCache(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
|
(void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
|
||||||
}
|
}
|
||||||
|
|
||||||
pSql->res.code = TSDB_CODE_SUCCESS;
|
pSql->res.code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -2857,9 +2856,9 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// release the used metricmeta
|
// release the used metricmeta
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
|
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
|
||||||
|
|
||||||
pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosAddDataIntoCache(tscCacheHandle, name, (char *)metricMetaList[i],
|
pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
|
||||||
sizes[i], tsMetricMetaKeepTimer);
|
sizes[i], tsMetricMetaKeepTimer);
|
||||||
tfree(metricMetaList[i]);
|
tfree(metricMetaList[i]);
|
||||||
|
|
||||||
|
@ -2917,11 +2916,11 @@ int tscProcessShowRsp(SSqlObj *pSql) {
|
||||||
key[0] = pCmd->msgType + 'a';
|
key[0] = pCmd->msgType + 'a';
|
||||||
strcpy(key + 1, "showlist");
|
strcpy(key + 1, "showlist");
|
||||||
|
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void *)&(pMeterMetaInfo->pMeterMeta), false);
|
taosCacheRelease(tscCacheHandle, (void *)&(pMeterMetaInfo->pMeterMeta), false);
|
||||||
|
|
||||||
int32_t size = pMeta->numOfColumns * sizeof(SSchema) + sizeof(STableMeta);
|
int32_t size = pMeta->numOfColumns * sizeof(SSchema) + sizeof(STableMeta);
|
||||||
pMeterMetaInfo->pMeterMeta =
|
pMeterMetaInfo->pMeterMeta =
|
||||||
(STableMeta *)taosAddDataIntoCache(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer);
|
(STableMeta *)taosCachePut(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer);
|
||||||
pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
|
pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
|
||||||
SSchema *pMeterSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
|
SSchema *pMeterSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
|
||||||
|
|
||||||
|
@ -2975,14 +2974,14 @@ int tscProcessUseDbRsp(SSqlObj *pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
|
int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
|
||||||
taosClearDataCache(tscCacheHandle);
|
taosCacheEmpty(tscCacheHandle);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tscProcessDropTableRsp(SSqlObj *pSql) {
|
int tscProcessDropTableRsp(SSqlObj *pSql) {
|
||||||
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
|
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
|
||||||
|
|
||||||
STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
|
STableMeta *pMeterMeta = taosCacheAcquireByName(tscCacheHandle, pMeterMetaInfo->name);
|
||||||
if (pMeterMeta == NULL) {
|
if (pMeterMeta == NULL) {
|
||||||
/* not in cache, abort */
|
/* not in cache, abort */
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -2996,11 +2995,11 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
|
||||||
* instead.
|
* instead.
|
||||||
*/
|
*/
|
||||||
tscTrace("%p force release metermeta after drop table:%s", pSql, pMeterMetaInfo->name);
|
tscTrace("%p force release metermeta after drop table:%s", pSql, pMeterMetaInfo->name);
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);
|
taosCacheRelease(tscCacheHandle, (void **)&pMeterMeta, true);
|
||||||
|
|
||||||
if (pMeterMetaInfo->pMeterMeta) {
|
if (pMeterMetaInfo->pMeterMeta) {
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
|
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
|
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -3009,23 +3008,23 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
|
||||||
int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
|
int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
|
||||||
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
|
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
|
||||||
|
|
||||||
STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
|
STableMeta *pMeterMeta = taosCacheAcquireByName(tscCacheHandle, pMeterMetaInfo->name);
|
||||||
if (pMeterMeta == NULL) { /* not in cache, abort */
|
if (pMeterMeta == NULL) { /* not in cache, abort */
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pMeterMetaInfo->name);
|
tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pMeterMetaInfo->name);
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);
|
taosCacheRelease(tscCacheHandle, (void **)&pMeterMeta, true);
|
||||||
|
|
||||||
if (pMeterMetaInfo->pMeterMeta) {
|
if (pMeterMetaInfo->pMeterMeta) {
|
||||||
bool isSuperTable = UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo);
|
bool isSuperTable = UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo);
|
||||||
|
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
|
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
|
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
|
||||||
|
|
||||||
if (isSuperTable) { // if it is a super table, reset whole query cache
|
if (isSuperTable) { // if it is a super table, reset whole query cache
|
||||||
tscTrace("%p reset query cache since table:%s is stable", pSql, pMeterMetaInfo->name);
|
tscTrace("%p reset query cache since table:%s is stable", pSql, pMeterMetaInfo->name);
|
||||||
taosClearDataCache(tscCacheHandle);
|
taosCacheEmpty(tscCacheHandle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3151,7 +3150,7 @@ static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMet
|
||||||
* Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine
|
* Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine
|
||||||
*/
|
*/
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta);
|
pMeterMetaInfo->pMeterMeta = taosCacheTransfer(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta);
|
||||||
assert(pMeterMetaInfo->pMeterMeta != NULL);
|
assert(pMeterMetaInfo->pMeterMeta != NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3177,10 +3176,10 @@ int tscGetMeterMeta(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
|
||||||
|
|
||||||
// If this SMeterMetaInfo owns a metermeta, release it first
|
// If this SMeterMetaInfo owns a metermeta, release it first
|
||||||
if (pMeterMetaInfo->pMeterMeta != NULL) {
|
if (pMeterMetaInfo->pMeterMeta != NULL) {
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
|
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeterMetaInfo->pMeterMeta = (STableMeta *)taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
|
pMeterMetaInfo->pMeterMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pMeterMetaInfo->name);
|
||||||
if (pMeterMetaInfo->pMeterMeta != NULL) {
|
if (pMeterMetaInfo->pMeterMeta != NULL) {
|
||||||
STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
|
STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
|
||||||
|
|
||||||
|
@ -3244,7 +3243,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tscWaitingForCreateTable(pCmd);
|
tscWaitingForCreateTable(pCmd);
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
|
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
|
||||||
|
|
||||||
code = doGetMeterMetaFromServer(pSql, pMeterMetaInfo); // todo ??
|
code = doGetMeterMetaFromServer(pSql, pMeterMetaInfo); // todo ??
|
||||||
} else {
|
} else {
|
||||||
|
@ -3278,9 +3277,9 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
|
||||||
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
|
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
|
||||||
tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pMeterMetaInfo->pMeterMeta->uid);
|
tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pMeterMetaInfo->pMeterMeta->uid);
|
||||||
|
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
|
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
|
||||||
|
|
||||||
SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
|
SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosCacheAcquireByName(tscCacheHandle, tagstr);
|
||||||
if (ppMeta == NULL) {
|
if (ppMeta == NULL) {
|
||||||
required = true;
|
required = true;
|
||||||
break;
|
break;
|
||||||
|
@ -3308,7 +3307,7 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
|
||||||
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||||
SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
|
SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
|
||||||
|
|
||||||
STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMMInfo->name);
|
STableMeta *pMeterMeta = taosCacheAcquireByName(tscCacheHandle, pMMInfo->name);
|
||||||
tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
|
tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3353,8 +3352,8 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
|
||||||
printf("create metric key:%s, index:%d\n", tagstr, i);
|
printf("create metric key:%s, index:%d\n", tagstr, i);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
|
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
|
||||||
pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
|
pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *) taosCacheAcquireByName(tscCacheHandle, tagstr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -371,7 +371,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
|
||||||
pSql->sqlstr = NULL;
|
pSql->sqlstr = NULL;
|
||||||
taos_free_result_imp(pSql, 0);
|
taos_free_result_imp(pSql, 0);
|
||||||
pSql->sqlstr = sqlstr;
|
pSql->sqlstr = sqlstr;
|
||||||
taosClearDataCache(tscCacheHandle);
|
taosCacheEmpty(tscCacheHandle);
|
||||||
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
|
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
|
||||||
tscTrace("meter synchronization completed");
|
tscTrace("meter synchronization completed");
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -186,7 +186,7 @@ void taos_init_imp() {
|
||||||
refreshTime = refreshTime > 2 ? 2 : refreshTime;
|
refreshTime = refreshTime > 2 ? 2 : refreshTime;
|
||||||
refreshTime = refreshTime < 1 ? 1 : refreshTime;
|
refreshTime = refreshTime < 1 ? 1 : refreshTime;
|
||||||
|
|
||||||
if (tscCacheHandle == NULL) tscCacheHandle = taosInitDataCache(tsMaxMeterConnections / 2, tscTmr, refreshTime);
|
if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime);
|
||||||
|
|
||||||
tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000);
|
tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000);
|
||||||
|
|
||||||
|
|
|
@ -505,7 +505,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
|
||||||
tfree(pDataBlock->params);
|
tfree(pDataBlock->params);
|
||||||
|
|
||||||
// free the refcount for metermeta
|
// free the refcount for metermeta
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void**)&(pDataBlock->pMeterMeta), false);
|
taosCacheRelease(tscCacheHandle, (void**)&(pDataBlock->pMeterMeta), false);
|
||||||
tfree(pDataBlock);
|
tfree(pDataBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -589,9 +589,9 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
|
||||||
// set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache
|
// set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache
|
||||||
if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) {
|
if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) {
|
||||||
strcpy(pMeterMetaInfo->name, pDataBlock->tableId);
|
strcpy(pMeterMetaInfo->name, pDataBlock->tableId);
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false);
|
taosCacheRelease(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false);
|
||||||
|
|
||||||
pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pDataBlock->pMeterMeta);
|
pMeterMetaInfo->pMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pDataBlock->pMeterMeta);
|
||||||
} else {
|
} else {
|
||||||
assert(strncmp(pMeterMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0);
|
assert(strncmp(pMeterMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0);
|
||||||
}
|
}
|
||||||
|
@ -665,7 +665,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
|
||||||
* due to operation such as drop database. So here we add the reference count directly instead of invoke
|
* due to operation such as drop database. So here we add the reference count directly instead of invoke
|
||||||
* taosGetDataFromCache, which may return NULL value.
|
* taosGetDataFromCache, which may return NULL value.
|
||||||
*/
|
*/
|
||||||
dataBuf->pMeterMeta = taosGetDataFromExists(tscCacheHandle, pMeterMeta);
|
dataBuf->pMeterMeta = taosCacheAcquireByData(tscCacheHandle, pMeterMeta);
|
||||||
assert(initialSize > 0 && pMeterMeta != NULL && dataBuf->pMeterMeta != NULL);
|
assert(initialSize > 0 && pMeterMeta != NULL && dataBuf->pMeterMeta != NULL);
|
||||||
|
|
||||||
*dataBlocks = dataBuf;
|
*dataBlocks = dataBuf;
|
||||||
|
@ -1940,8 +1940,8 @@ void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), removeFromCache);
|
taosCacheRelease(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), removeFromCache);
|
||||||
taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMetricMeta), removeFromCache);
|
taosCacheRelease(tscCacheHandle, (void**)&(pMeterMetaInfo->pMetricMeta), removeFromCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscResetForNextRetrieve(SSqlRes* pRes) {
|
void tscResetForNextRetrieve(SSqlRes* pRes) {
|
||||||
|
@ -2071,16 +2071,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
||||||
SMeterMetaInfo* pFinalInfo = NULL;
|
SMeterMetaInfo* pFinalInfo = NULL;
|
||||||
|
|
||||||
if (pPrevSql == NULL) {
|
if (pPrevSql == NULL) {
|
||||||
STableMeta* pMeterMeta = taosGetDataFromCache(tscCacheHandle, name);
|
STableMeta* pMeterMeta = taosCacheAcquireByName(tscCacheHandle, name);
|
||||||
SSuperTableMeta* pMetricMeta = taosGetDataFromCache(tscCacheHandle, key);
|
SSuperTableMeta* pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key);
|
||||||
|
|
||||||
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pMeterMeta, pMetricMeta, pMeterMetaInfo->numOfTags,
|
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pMeterMeta, pMetricMeta, pMeterMetaInfo->numOfTags,
|
||||||
pMeterMetaInfo->tagColumnIndex);
|
pMeterMetaInfo->tagColumnIndex);
|
||||||
} else { // transfer the ownership of pMeterMeta/pMetricMeta to the newly create sql object.
|
} else { // transfer the ownership of pMeterMeta/pMetricMeta to the newly create sql object.
|
||||||
SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
|
SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
|
||||||
|
|
||||||
STableMeta* pPrevMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pPrevInfo->pMeterMeta);
|
STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMeterMeta);
|
||||||
SSuperTableMeta* pPrevMetricMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
|
SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
|
||||||
|
|
||||||
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pMeterMetaInfo->numOfTags,
|
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pMeterMetaInfo->numOfTags,
|
||||||
pMeterMetaInfo->tagColumnIndex);
|
pMeterMetaInfo->tagColumnIndex);
|
||||||
|
|
|
@ -20,7 +20,63 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <stdbool.h>
|
#include "os.h"
|
||||||
|
#include "tref.h"
|
||||||
|
#include "hash.h"
|
||||||
|
|
||||||
|
typedef struct SCacheStatis {
|
||||||
|
int64_t missCount;
|
||||||
|
int64_t hitCount;
|
||||||
|
int64_t totalAccess;
|
||||||
|
int64_t refreshCount;
|
||||||
|
int32_t numOfCollision;
|
||||||
|
} SCacheStatis;
|
||||||
|
|
||||||
|
typedef struct SCacheDataNode {
|
||||||
|
uint64_t addedTime; // the added time when this element is added or updated into cache
|
||||||
|
uint64_t expiredTime; // expiredTime expiredTime when this element should be remove from cache
|
||||||
|
uint64_t signature;
|
||||||
|
uint32_t size; // allocated size for current SCacheDataNode
|
||||||
|
uint16_t keySize : 15;
|
||||||
|
bool inTrash : 1; // denote if it is in trash or not
|
||||||
|
T_REF_DECLARE()
|
||||||
|
char *key;
|
||||||
|
char data[];
|
||||||
|
} SCacheDataNode;
|
||||||
|
|
||||||
|
typedef struct STrashElem {
|
||||||
|
struct STrashElem *prev;
|
||||||
|
struct STrashElem *next;
|
||||||
|
SCacheDataNode * pData;
|
||||||
|
} STrashElem;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
|
||||||
|
int64_t refreshTime;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* to accommodate the old datanode which has the same key value of new one in hashList
|
||||||
|
* when an new node is put into cache, if an existed one with the same key:
|
||||||
|
* 1. if the old one does not be referenced, update it.
|
||||||
|
* 2. otherwise, move the old one to pTrash, addedTime the new one.
|
||||||
|
*
|
||||||
|
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime
|
||||||
|
*/
|
||||||
|
STrashElem * pTrash;
|
||||||
|
void * tmrCtrl;
|
||||||
|
void * pTimer;
|
||||||
|
SCacheStatis statistics;
|
||||||
|
SHashObj * pHashTable;
|
||||||
|
int numOfElemsInTrash; // number of element in trash
|
||||||
|
int16_t deleting; // set the deleting flag to stop refreshing ASAP.
|
||||||
|
|
||||||
|
#if defined(LINUX)
|
||||||
|
pthread_rwlock_t lock;
|
||||||
|
#else
|
||||||
|
pthread_mutex_t lock;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
} SCacheObj;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -30,7 +86,7 @@ extern "C" {
|
||||||
* not referenced by other objects
|
* not referenced by other objects
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
void *taosInitDataCache(int maxSessions, void *tmrCtrl, int64_t refreshTimeInSeconds);
|
SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTimeInSeconds);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* add data into cache
|
* add data into cache
|
||||||
|
@ -42,7 +98,35 @@ void *taosInitDataCache(int maxSessions, void *tmrCtrl, int64_t refreshTimeInSec
|
||||||
* @param keepTime survival time in second
|
* @param keepTime survival time in second
|
||||||
* @return cached element
|
* @return cached element
|
||||||
*/
|
*/
|
||||||
void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, int keepTimeInSeconds);
|
void *taosCachePut(void *handle, char *key, char *pData, int dataSize, int keepTimeInSeconds);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get data from cache
|
||||||
|
* @param handle cache object
|
||||||
|
* @param key key
|
||||||
|
* @return cached data or NULL
|
||||||
|
*/
|
||||||
|
void *taosCacheAcquireByName(void *handle, char *key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add one reference count for the exist data, and assign this data for a new owner.
|
||||||
|
* The new owner needs to invoke the taosCacheRelease when it does not need this data anymore.
|
||||||
|
* This procedure is a faster version of taosCacheAcquireByName function, which avoids the sideeffect of the problem of
|
||||||
|
* the data is moved to trash, and taosCacheAcquireByName will fail to retrieve it again.
|
||||||
|
*
|
||||||
|
* @param handle
|
||||||
|
* @param data
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
void *taosCacheAcquireByData(void *handle, void *data);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* transfer the ownership of data in cache to another object without increasing reference count.
|
||||||
|
* @param handle
|
||||||
|
* @param data
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
void *taosCacheTransfer(void *handle, void **data);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* remove data in cache, the data will not be removed immediately.
|
* remove data in cache, the data will not be removed immediately.
|
||||||
|
@ -52,59 +136,26 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i
|
||||||
* @param _remove force model, reduce the ref count and move the data into
|
* @param _remove force model, reduce the ref count and move the data into
|
||||||
* pTrash
|
* pTrash
|
||||||
*/
|
*/
|
||||||
void taosRemoveDataFromCache(void *handle, void **data, bool _remove);
|
void taosCacheRelease(void *handle, void **data, bool _remove);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* update data in cache
|
* move all data node into trash, clear node in trash can if it is not referenced by any clients
|
||||||
* @param handle hash object handle(pointer)
|
* @param handle
|
||||||
* @param key key for hash
|
|
||||||
* @param pData actually data
|
|
||||||
* @param size length of data
|
|
||||||
* @param duration survival time of this object in cache
|
|
||||||
* @return new referenced data
|
|
||||||
*/
|
*/
|
||||||
void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, int duration);
|
void taosCacheEmpty(SCacheObj *pCacheObj);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* get data from cache
|
* release all allocated memory and destroy the cache object.
|
||||||
* @param handle cache object
|
*
|
||||||
* @param key key
|
* This function only set the deleting flag, and the specific work of clean up cache is delegated to
|
||||||
* @return cached data or NULL
|
* taosCacheRefresh function, which will executed every SCacheObj->refreshTime sec.
|
||||||
*/
|
*
|
||||||
void *taosGetDataFromCache(void *handle, char *key);
|
* If the value of SCacheObj->refreshTime is too large, the taosCacheRefresh function may not be invoked
|
||||||
|
* before the main thread terminated, in which case all allocated resources are simply recycled by OS.
|
||||||
/**
|
|
||||||
* release all allocated memory and destroy the cache object
|
|
||||||
*
|
*
|
||||||
* @param handle
|
* @param handle
|
||||||
*/
|
*/
|
||||||
void taosCleanUpDataCache(void *handle);
|
void taosCacheCleanup(SCacheObj *pCacheObj);
|
||||||
|
|
||||||
/**
|
|
||||||
* move all data node into trash,clear node in trash can if it is not referenced by client
|
|
||||||
* @param handle
|
|
||||||
*/
|
|
||||||
void taosClearDataCache(void *handle);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add one reference count for the exist data, and assign this data for a new owner.
|
|
||||||
* The new owner needs to invoke the taosRemoveDataFromCache when it does not need this data anymore.
|
|
||||||
* This procedure is a faster version of taosGetDataFromCache function, which avoids the sideeffect of the problem of the
|
|
||||||
* data is moved to trash, and taosGetDataFromCache will fail to retrieve it again.
|
|
||||||
*
|
|
||||||
* @param handle
|
|
||||||
* @param data
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
void* taosGetDataFromExists(void* handle, void* data);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* transfer the ownership of data in cache to another object without increasing reference count.
|
|
||||||
* @param handle
|
|
||||||
* @param data
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
void* taosTransferDataInCache(void* handle, void** data);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,17 +23,18 @@ extern "C" {
|
||||||
#include "hashfunc.h"
|
#include "hashfunc.h"
|
||||||
|
|
||||||
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
|
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
|
||||||
#define HASH_VALUE_IN_TRASH (-1)
|
|
||||||
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
|
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
|
||||||
#define HASH_INDEX(v, c) ((v) & ((c)-1))
|
#define HASH_INDEX(v, c) ((v) & ((c)-1))
|
||||||
|
|
||||||
|
typedef void (*_hash_free_fn_t)(void *param);
|
||||||
|
|
||||||
typedef struct SHashNode {
|
typedef struct SHashNode {
|
||||||
char *key; // null-terminated string
|
char *key;
|
||||||
union {
|
union {
|
||||||
struct SHashNode * prev;
|
struct SHashNode * prev;
|
||||||
struct SHashEntry *prev1;
|
struct SHashEntry *prev1;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SHashNode *next;
|
struct SHashNode *next;
|
||||||
uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash
|
uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash
|
||||||
uint32_t keyLen; // length of the key
|
uint32_t keyLen; // length of the key
|
||||||
|
@ -46,18 +47,27 @@ typedef struct SHashEntry {
|
||||||
} SHashEntry;
|
} SHashEntry;
|
||||||
|
|
||||||
typedef struct SHashObj {
|
typedef struct SHashObj {
|
||||||
SHashEntry **hashList;
|
SHashEntry ** hashList;
|
||||||
size_t capacity; // number of slots
|
size_t capacity; // number of slots
|
||||||
size_t size; // number of elements in hash table
|
size_t size; // number of elements in hash table
|
||||||
_hash_fn_t hashFp; // hash function
|
_hash_fn_t hashFp; // hash function
|
||||||
|
_hash_free_fn_t freeFp; // hash node free callback function
|
||||||
|
|
||||||
#if defined (LINUX)
|
#if defined(LINUX)
|
||||||
pthread_rwlock_t* lock;
|
pthread_rwlock_t *lock;
|
||||||
#else
|
#else
|
||||||
pthread_mutex_t* lock;
|
pthread_mutex_t *lock;
|
||||||
#endif
|
#endif
|
||||||
} SHashObj;
|
} SHashObj;
|
||||||
|
|
||||||
|
typedef struct SHashMutableIterator {
|
||||||
|
SHashObj * pHashObj;
|
||||||
|
int32_t entryIndex;
|
||||||
|
SHashNode *pCur;
|
||||||
|
SHashNode *pNext; // current node can be deleted for mutable iterator, so keep the next one before return current
|
||||||
|
int32_t num; // already check number of elements in hash table
|
||||||
|
} SHashMutableIterator;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* init the hash table
|
* init the hash table
|
||||||
*
|
*
|
||||||
|
@ -102,7 +112,7 @@ void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen);
|
||||||
* @param key
|
* @param key
|
||||||
* @param keyLen
|
* @param keyLen
|
||||||
*/
|
*/
|
||||||
void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen);
|
void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* clean up hash table
|
* clean up hash table
|
||||||
|
@ -110,6 +120,41 @@ void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen);
|
||||||
*/
|
*/
|
||||||
void taosHashCleanup(SHashObj *pHashObj);
|
void taosHashCleanup(SHashObj *pHashObj);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the free callback function
|
||||||
|
* This function if set will be invoked right before freeing each hash node
|
||||||
|
* @param pHashObj
|
||||||
|
*/
|
||||||
|
void taosHashSetFreecb(SHashObj *pHashObj, _hash_free_fn_t freeFp);
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param pHashObj
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
SHashMutableIterator* taosHashCreateIter(SHashObj *pHashObj);
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param iter
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
bool taosHashIterNext(SHashMutableIterator *iter);
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param iter
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
void *taosHashIterGet(SHashMutableIterator *iter);
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param iter
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
void* taosHashDestroyIter(SHashMutableIterator* iter);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param pHashObj
|
* @param pHashObj
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_TREF_H
|
||||||
|
#define TDENGINE_TREF_H
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
|
||||||
|
typedef void (*_ref_fn_t)(const void* pObj);
|
||||||
|
|
||||||
|
#define T_REF_DECLARE() \
|
||||||
|
struct { \
|
||||||
|
int16_t val; \
|
||||||
|
} _ref;
|
||||||
|
|
||||||
|
#define T_REF_REGISTER_FUNC(s, e) \
|
||||||
|
struct { \
|
||||||
|
_ref_fn_t start; \
|
||||||
|
_ref_fn_t end; \
|
||||||
|
} _ref_func = {.begin = (s), .end = (e)};
|
||||||
|
|
||||||
|
#define T_REF_INC(x) (atomic_add_fetch_16(&((x)->_ref.val), 1));
|
||||||
|
|
||||||
|
#define T_REF_INC_WITH_CB(x, p) \
|
||||||
|
do { \
|
||||||
|
int32_t v = atomic_add_fetch_32(&((x)->_ref.val), 1); \
|
||||||
|
if (v == 1 && (p)->_ref_func.begin != NULL) { \
|
||||||
|
(p)->_ref_func.begin((x)); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define T_REF_DEC(x) (atomic_sub_fetch_16(&((x)->_ref.val), 1));
|
||||||
|
|
||||||
|
#define T_REF_DEC_WITH_CB(x, p) \
|
||||||
|
do { \
|
||||||
|
int32_t v = atomic_sub_fetch_16(&((x)->_ref.val), 1); \
|
||||||
|
if (v == 0 && (p)->_ref_func.end != NULL) { \
|
||||||
|
(p)->_ref_func.end((x)); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
#define T_REF_VAL_CHECK(x) assert((x)->_ref.val >= 0);
|
||||||
|
|
||||||
|
#define T_REF_VAL_GET(x) (x)->_ref.val
|
||||||
|
|
||||||
|
#endif // TDENGINE_TREF_H
|
|
@ -24,8 +24,8 @@ static FORCE_INLINE void __wr_lock(void *lock) {
|
||||||
if (lock == NULL) {
|
if (lock == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if defined (LINUX)
|
#if defined(LINUX)
|
||||||
pthread_rwlock_wrlock(lock);
|
pthread_rwlock_wrlock(lock);
|
||||||
#else
|
#else
|
||||||
pthread_mutex_lock(lock);
|
pthread_mutex_lock(lock);
|
||||||
|
@ -36,8 +36,8 @@ static FORCE_INLINE void __rd_lock(void *lock) {
|
||||||
if (lock == NULL) {
|
if (lock == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if defined (LINUX)
|
#if defined(LINUX)
|
||||||
pthread_rwlock_rdlock(lock);
|
pthread_rwlock_rdlock(lock);
|
||||||
#else
|
#else
|
||||||
pthread_mutex_lock(lock);
|
pthread_mutex_lock(lock);
|
||||||
|
@ -48,8 +48,8 @@ static FORCE_INLINE void __unlock(void *lock) {
|
||||||
if (lock == NULL) {
|
if (lock == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if defined (LINUX)
|
#if defined(LINUX)
|
||||||
pthread_rwlock_unlock(lock);
|
pthread_rwlock_unlock(lock);
|
||||||
#else
|
#else
|
||||||
pthread_mutex_unlock(lock);
|
pthread_mutex_unlock(lock);
|
||||||
|
@ -60,8 +60,8 @@ static FORCE_INLINE int32_t __lock_init(void *lock) {
|
||||||
if (lock == NULL) {
|
if (lock == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if defined (LINUX)
|
#if defined(LINUX)
|
||||||
return pthread_rwlock_init(lock, NULL);
|
return pthread_rwlock_init(lock, NULL);
|
||||||
#else
|
#else
|
||||||
return pthread_mutex_init(lock, NULL);
|
return pthread_mutex_init(lock, NULL);
|
||||||
|
@ -72,8 +72,8 @@ static FORCE_INLINE void __lock_destroy(void *lock) {
|
||||||
if (lock == NULL) {
|
if (lock == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if defined (LINUX)
|
#if defined(LINUX)
|
||||||
pthread_rwlock_destroy(lock);
|
pthread_rwlock_destroy(lock);
|
||||||
#else
|
#else
|
||||||
pthread_mutex_destroy(lock);
|
pthread_mutex_destroy(lock);
|
||||||
|
@ -107,7 +107,7 @@ static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* get SHashNode from hashlist, nodes from trash are not included.
|
* get SHashNode from hashlist, nodes from trash are not included.
|
||||||
* @param pHashObj Cache objection
|
* @param pHashObj Cache objection
|
||||||
* @param key key for hash
|
* @param key key for hash
|
||||||
* @param keyLen key length
|
* @param keyLen key length
|
||||||
* @return
|
* @return
|
||||||
|
@ -155,24 +155,24 @@ static void taosHashTableResize(SHashObj *pHashObj) {
|
||||||
|
|
||||||
int32_t newSize = pHashObj->capacity << 1U;
|
int32_t newSize = pHashObj->capacity << 1U;
|
||||||
if (newSize > HASH_MAX_CAPACITY) {
|
if (newSize > HASH_MAX_CAPACITY) {
|
||||||
pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", pHashObj->capacity,
|
pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached",
|
||||||
HASH_MAX_CAPACITY);
|
pHashObj->capacity, HASH_MAX_CAPACITY);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry*) * newSize);
|
SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry *) * newSize);
|
||||||
if (pNewEntry == NULL) {
|
if (pNewEntry == NULL) {
|
||||||
pTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
|
pTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pHashObj->hashList = pNewEntry;
|
pHashObj->hashList = pNewEntry;
|
||||||
for(int32_t i = pHashObj->capacity; i < newSize; ++i) {
|
for (int32_t i = pHashObj->capacity; i < newSize; ++i) {
|
||||||
pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry));
|
pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry));
|
||||||
}
|
}
|
||||||
|
|
||||||
pHashObj->capacity = newSize;
|
pHashObj->capacity = newSize;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||||
|
@ -182,7 +182,7 @@ static void taosHashTableResize(SHashObj *pHashObj) {
|
||||||
if (pNode != NULL) {
|
if (pNode != NULL) {
|
||||||
assert(pNode->prev1 == pEntry && pEntry->num > 0);
|
assert(pNode->prev1 == pEntry && pEntry->num > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
||||||
if (j == i) { // this key resides in the same slot, no need to relocate it
|
if (j == i) { // this key resides in the same slot, no need to relocate it
|
||||||
|
@ -192,13 +192,13 @@ static void taosHashTableResize(SHashObj *pHashObj) {
|
||||||
|
|
||||||
// remove from current slot
|
// remove from current slot
|
||||||
assert(pNode->prev1 != NULL);
|
assert(pNode->prev1 != NULL);
|
||||||
|
|
||||||
if (pNode->prev1 == pEntry) { // first node of the overflow linked list
|
if (pNode->prev1 == pEntry) { // first node of the overflow linked list
|
||||||
pEntry->next = pNode->next;
|
pEntry->next = pNode->next;
|
||||||
} else {
|
} else {
|
||||||
pNode->prev->next = pNode->next;
|
pNode->prev->next = pNode->next;
|
||||||
}
|
}
|
||||||
|
|
||||||
pEntry->num--;
|
pEntry->num--;
|
||||||
assert(pEntry->num >= 0);
|
assert(pEntry->num >= 0);
|
||||||
|
|
||||||
|
@ -214,13 +214,13 @@ static void taosHashTableResize(SHashObj *pHashObj) {
|
||||||
|
|
||||||
if (pNewIndexEntry->next != NULL) {
|
if (pNewIndexEntry->next != NULL) {
|
||||||
assert(pNewIndexEntry->next->prev1 == pNewIndexEntry);
|
assert(pNewIndexEntry->next->prev1 == pNewIndexEntry);
|
||||||
|
|
||||||
pNewIndexEntry->next->prev = pNode;
|
pNewIndexEntry->next->prev = pNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
pNode->next = pNewIndexEntry->next;
|
pNode->next = pNewIndexEntry->next;
|
||||||
pNode->prev1 = pNewIndexEntry;
|
pNode->prev1 = pNewIndexEntry;
|
||||||
|
|
||||||
pNewIndexEntry->next = pNode;
|
pNewIndexEntry->next = pNode;
|
||||||
pNewIndexEntry->num++;
|
pNewIndexEntry->num++;
|
||||||
|
|
||||||
|
@ -258,14 +258,14 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
|
||||||
|
|
||||||
pHashObj->hashFp = fn;
|
pHashObj->hashFp = fn;
|
||||||
|
|
||||||
pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(SHashEntry*));
|
pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(SHashEntry *));
|
||||||
if (pHashObj->hashList == NULL) {
|
if (pHashObj->hashList == NULL) {
|
||||||
free(pHashObj);
|
free(pHashObj);
|
||||||
pError("failed to allocate memory, reason:%s", strerror(errno));
|
pError("failed to allocate memory, reason:%s", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int32_t i = 0; i < pHashObj->capacity; ++i) {
|
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||||
pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry));
|
pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -276,7 +276,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
|
||||||
pHashObj->lock = calloc(1, sizeof(pthread_mutex_t));
|
pHashObj->lock = calloc(1, sizeof(pthread_mutex_t));
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
if (__lock_init(pHashObj->lock) != 0) {
|
if (__lock_init(pHashObj->lock) != 0) {
|
||||||
free(pHashObj->hashList);
|
free(pHashObj->hashList);
|
||||||
free(pHashObj);
|
free(pHashObj);
|
||||||
|
@ -347,7 +347,7 @@ static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode) {
|
||||||
|
|
||||||
int32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
int32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
||||||
SHashEntry *pEntry = pHashObj->hashList[index];
|
SHashEntry *pEntry = pHashObj->hashList[index];
|
||||||
|
|
||||||
pNode->next = pEntry->next;
|
pNode->next = pEntry->next;
|
||||||
|
|
||||||
if (pEntry->next) {
|
if (pEntry->next) {
|
||||||
|
@ -356,7 +356,7 @@ static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode) {
|
||||||
|
|
||||||
pEntry->next = pNode;
|
pEntry->next = pNode;
|
||||||
pNode->prev1 = pEntry;
|
pNode->prev1 = pEntry;
|
||||||
|
|
||||||
pEntry->num++;
|
pEntry->num++;
|
||||||
pHashObj->size++;
|
pHashObj->size++;
|
||||||
}
|
}
|
||||||
|
@ -365,7 +365,7 @@ size_t taosHashGetSize(const SHashObj *pHashObj) {
|
||||||
if (pHashObj == NULL) {
|
if (pHashObj == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pHashObj->size;
|
return pHashObj->size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -430,7 +430,7 @@ void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen) {
|
||||||
void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen) {
|
void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen) {
|
||||||
__wr_lock(pHashObj->lock);
|
__wr_lock(pHashObj->lock);
|
||||||
|
|
||||||
uint32_t val = 0;
|
uint32_t val = 0;
|
||||||
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, &val);
|
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, &val);
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
__unlock(pHashObj->lock);
|
__unlock(pHashObj->lock);
|
||||||
|
@ -446,13 +446,13 @@ void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen) {
|
||||||
pNode->prev->next = pNext;
|
pNode->prev->next = pNext;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pNext != NULL) {
|
if (pNext != NULL) {
|
||||||
pNext->prev = pNode->prev;
|
pNext->prev = pNode->prev;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
uint32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
||||||
|
|
||||||
SHashEntry *pEntry = pHashObj->hashList[index];
|
SHashEntry *pEntry = pHashObj->hashList[index];
|
||||||
pEntry->num--;
|
pEntry->num--;
|
||||||
|
|
||||||
|
@ -483,10 +483,14 @@ void taosHashCleanup(SHashObj *pHashObj) {
|
||||||
|
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
pNext = pNode->next;
|
pNext = pNode->next;
|
||||||
|
if (pHashObj->freeFp) {
|
||||||
|
pHashObj->freeFp(pNode->data);
|
||||||
|
}
|
||||||
|
|
||||||
free(pNode);
|
free(pNode);
|
||||||
pNode = pNext;
|
pNode = pNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pEntry);
|
tfree(pEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -496,24 +500,122 @@ void taosHashCleanup(SHashObj *pHashObj) {
|
||||||
__unlock(pHashObj->lock);
|
__unlock(pHashObj->lock);
|
||||||
__lock_destroy(pHashObj->lock);
|
__lock_destroy(pHashObj->lock);
|
||||||
|
|
||||||
|
tfree(pHashObj->lock);
|
||||||
memset(pHashObj, 0, sizeof(SHashObj));
|
memset(pHashObj, 0, sizeof(SHashObj));
|
||||||
free(pHashObj);
|
free(pHashObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void taosHashSetFreecb(SHashObj *pHashObj, _hash_free_fn_t freeFp) {
|
||||||
|
if (pHashObj == NULL || freeFp == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pHashObj->freeFp = freeFp;
|
||||||
|
}
|
||||||
|
|
||||||
|
SHashMutableIterator *taosHashCreateIter(SHashObj *pHashObj) {
|
||||||
|
SHashMutableIterator *pIter = calloc(1, sizeof(SHashMutableIterator));
|
||||||
|
if (pIter == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter->pHashObj = pHashObj;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SHashNode *getNextHashNode(SHashMutableIterator *pIter) {
|
||||||
|
assert(pIter != NULL);
|
||||||
|
|
||||||
|
while (pIter->entryIndex < pIter->pHashObj->capacity) {
|
||||||
|
SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
|
||||||
|
if (pEntry->next == NULL) {
|
||||||
|
pIter->entryIndex++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pEntry->next;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool taosHashIterNext(SHashMutableIterator *pIter) {
|
||||||
|
if (pIter == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t size = taosHashGetSize(pIter->pHashObj);
|
||||||
|
if (size == 0 || pIter->num >= size) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check the first one
|
||||||
|
if (pIter->num == 0) {
|
||||||
|
assert(pIter->pCur == NULL && pIter->pNext == NULL);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
|
||||||
|
if (pEntry->next == NULL) {
|
||||||
|
pIter->entryIndex++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter->pCur = pEntry->next;
|
||||||
|
|
||||||
|
if (pIter->pCur->next) {
|
||||||
|
pIter->pNext = pIter->pCur->next;
|
||||||
|
} else {
|
||||||
|
pIter->pNext = getNextHashNode(pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter->num++;
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
assert(pIter->pCur != NULL);
|
||||||
|
if (pIter->pNext) {
|
||||||
|
pIter->pCur = pIter->pNext;
|
||||||
|
} else { // no more data in the hash list
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter->num++;
|
||||||
|
|
||||||
|
if (pIter->pCur->next) {
|
||||||
|
pIter->pNext = pIter->pCur->next;
|
||||||
|
} else {
|
||||||
|
pIter->pNext = getNextHashNode(pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void *taosHashIterGet(SHashMutableIterator *iter) { return (iter == NULL) ? NULL : iter->pCur->data; }
|
||||||
|
|
||||||
|
void *taosHashDestroyIter(SHashMutableIterator *iter) {
|
||||||
|
if (iter == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
free(iter);
|
||||||
|
}
|
||||||
|
|
||||||
// for profile only
|
// for profile only
|
||||||
int32_t taosHashGetMaxOverflowLinkLength(const SHashObj* pHashObj) {
|
int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) {
|
||||||
if (pHashObj == NULL || pHashObj->size == 0) {
|
if (pHashObj == NULL || pHashObj->size == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
|
|
||||||
for(int32_t i = 0; i < pHashObj->size; ++i) {
|
for (int32_t i = 0; i < pHashObj->size; ++i) {
|
||||||
SHashEntry *pEntry = pHashObj->hashList[i];
|
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||||
if (num < pEntry->num) {
|
if (num < pEntry->num) {
|
||||||
num = pEntry->num;
|
num = pEntry->num;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue