Merge remote-tracking branch 'origin/3.0' into feature/shm
This commit is contained in:
commit
b16e686809
|
@ -38,11 +38,14 @@ typedef struct SRpcConnInfo {
|
||||||
|
|
||||||
typedef struct SRpcMsg {
|
typedef struct SRpcMsg {
|
||||||
tmsg_t msgType;
|
tmsg_t msgType;
|
||||||
|
tmsg_t expectMsgType;
|
||||||
void * pCont;
|
void * pCont;
|
||||||
int contLen;
|
int contLen;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
void *handle; // rpc handle returned to app
|
void * handle; // rpc handle returned to app
|
||||||
void *ahandle; // app handle set by client
|
void * ahandle; // app handle set by client
|
||||||
|
int noResp; // has response or not(default 0 indicate resp);
|
||||||
|
|
||||||
} SRpcMsg;
|
} SRpcMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -107,10 +107,8 @@ int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg);
|
||||||
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
|
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
|
||||||
|
|
||||||
// TODO: This is the basic params, and should wrap the params to a queryHandle.
|
// TODO: This is the basic params, and should wrap the params to a queryHandle.
|
||||||
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
|
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit,
|
||||||
int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey,
|
tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult);
|
||||||
int32_t nMaxResult);
|
|
||||||
|
|
||||||
|
|
||||||
// STsdbCfg
|
// STsdbCfg
|
||||||
int tsdbOptionsInit(STsdbCfg *);
|
int tsdbOptionsInit(STsdbCfg *);
|
||||||
|
|
|
@ -636,7 +636,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode
|
// Decode
|
||||||
pCfg = (STSma *)malloc(sizeof(STSma));
|
pCfg = (STSma *)calloc(1, sizeof(STSma));
|
||||||
if (pCfg == NULL) {
|
if (pCfg == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@ static const char *TSDB_SMA_DNAME[] = {
|
||||||
"tsma", // TSDB_SMA_TYPE_TIME_RANGE
|
"tsma", // TSDB_SMA_TYPE_TIME_RANGE
|
||||||
"rsma", // TSDB_SMA_TYPE_ROLLUP
|
"rsma", // TSDB_SMA_TYPE_ROLLUP
|
||||||
};
|
};
|
||||||
|
#define SMA_CHECK_HASH
|
||||||
#undef SMA_PRINT_DEBUG_LOG
|
#undef SMA_PRINT_DEBUG_LOG
|
||||||
#define SMA_STORAGE_TSDB_DAYS 30
|
#define SMA_STORAGE_TSDB_DAYS 30
|
||||||
#define SMA_STORAGE_TSDB_TIMES 10
|
#define SMA_STORAGE_TSDB_TIMES 10
|
||||||
|
@ -72,35 +72,43 @@ typedef struct {
|
||||||
|
|
||||||
struct SSmaStat {
|
struct SSmaStat {
|
||||||
SHashObj *smaStatItems; // key: indexName, value: SSmaStatItem
|
SHashObj *smaStatItems; // key: indexName, value: SSmaStatItem
|
||||||
|
T_REF_DECLARE()
|
||||||
};
|
};
|
||||||
|
|
||||||
// declaration of static functions
|
// declaration of static functions
|
||||||
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
|
// expired window
|
||||||
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
|
static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg);
|
||||||
// TODO: This is the basic params, and should wrap the params to a queryHandle.
|
|
||||||
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
|
|
||||||
int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey,
|
|
||||||
int32_t nMaxResult);
|
|
||||||
static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg);
|
|
||||||
|
|
||||||
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
|
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
|
||||||
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
|
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
|
||||||
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path);
|
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path);
|
||||||
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv);
|
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv);
|
||||||
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
|
static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
|
||||||
static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH);
|
static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);
|
||||||
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit);
|
static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);
|
||||||
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit);
|
|
||||||
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData);
|
|
||||||
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen);
|
|
||||||
|
|
||||||
|
// read data
|
||||||
|
// TODO: This is the basic params, and should wrap the params to a queryHandle.
|
||||||
|
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
|
||||||
|
int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey,
|
||||||
|
int32_t nMaxResult);
|
||||||
|
|
||||||
|
// insert data
|
||||||
|
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
|
||||||
|
static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH);
|
||||||
|
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit);
|
||||||
|
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit);
|
||||||
|
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData);
|
||||||
|
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen);
|
||||||
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision);
|
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision);
|
||||||
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel);
|
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel);
|
||||||
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid);
|
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid);
|
||||||
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey);
|
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey);
|
||||||
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
|
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
|
||||||
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]);
|
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]);
|
||||||
|
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
|
||||||
|
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
|
||||||
|
|
||||||
|
// implementation
|
||||||
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
|
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
|
||||||
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vgId, TSDB_SMA_DNAME[smaType]);
|
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vgId, TSDB_SMA_DNAME[smaType]);
|
||||||
}
|
}
|
||||||
|
@ -192,6 +200,21 @@ void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) {
|
||||||
|
if (pStat == NULL) return 0;
|
||||||
|
int ref = T_REF_INC(pStat);
|
||||||
|
tsdbDebug("vgId:%d ref sma stat %p ref %d", REPO_ID(pTsdb), pStat, ref);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) {
|
||||||
|
if (pStat == NULL) return 0;
|
||||||
|
|
||||||
|
int ref = T_REF_DEC(pStat);
|
||||||
|
tsdbDebug("vgId:%d unref sma stat %p ref %d", REPO_ID(pTsdb), pStat, ref);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
|
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
|
||||||
ASSERT(pSmaStat != NULL);
|
ASSERT(pSmaStat != NULL);
|
||||||
|
|
||||||
|
@ -296,7 +319,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
|
||||||
* @param msg
|
* @param msg
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
|
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
|
||||||
if (!msg || !pTsdb->pMeta) {
|
if (!msg || !pTsdb->pMeta) {
|
||||||
terrno = TSDB_CODE_INVALID_PTR;
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -307,27 +330,28 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSmaEnv *pEnv = REPO_SMA_ENV(pTsdb, smaType);
|
// TODO: decode the msg from Stream Computing module => start
|
||||||
|
int64_t indexUid = SMA_TEST_INDEX_UID;
|
||||||
// TODO: decode the msg => start
|
|
||||||
int64_t indexUid = SMA_TEST_INDEX_UID;
|
|
||||||
// const char * indexName = SMA_TEST_INDEX_NAME;
|
|
||||||
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
|
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
|
||||||
TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
|
TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
|
||||||
int64_t now = taosGetTimestampMs();
|
TSKEY skey1 = 1646987196 * 1e3;
|
||||||
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
|
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
|
||||||
expiredWindows[i] = now + i;
|
expiredWindows[i] = skey1 + i;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: decode the msg <= end
|
// TODO: decode the msg <= end
|
||||||
|
|
||||||
|
SSmaEnv * pEnv = REPO_SMA_ENV(pTsdb, smaType);
|
||||||
|
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||||
SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
|
SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
|
||||||
|
|
||||||
|
tsdbRefSmaStat(pTsdb, pStat);
|
||||||
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
|
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
|
||||||
if (pItem == NULL) {
|
if (pItem == NULL) {
|
||||||
pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state
|
pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state
|
||||||
if (pItem == NULL) {
|
if (pItem == NULL) {
|
||||||
// Response to stream computing: OOM
|
// Response to stream computing: OOM
|
||||||
// For query, if the indexName not found, the TSDB should tell query module to query raw TS data.
|
// For query, if the indexUid not found, the TSDB should tell query module to query raw TS data.
|
||||||
|
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -337,6 +361,7 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
|
||||||
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
|
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
|
||||||
taosHashCleanup(pItem->expiredWindows);
|
taosHashCleanup(pItem->expiredWindows);
|
||||||
free(pItem);
|
free(pItem);
|
||||||
|
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||||
tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid,
|
tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid,
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -347,18 +372,14 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
|
||||||
// If error occurs during put smaStatItem, free the resources of pItem
|
// If error occurs during put smaStatItem, free the resources of pItem
|
||||||
taosHashCleanup(pItem->expiredWindows);
|
taosHashCleanup(pItem->expiredWindows);
|
||||||
free(pItem);
|
free(pItem);
|
||||||
|
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#if 0
|
|
||||||
SSmaStatItem *pItem1 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
|
|
||||||
int size1 = taosHashGetSize(pItem1->expiredWindows);
|
|
||||||
tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d before hashPut", REPO_ID(pTsdb), indexUid, size1);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int8_t state = TSDB_SMA_STAT_EXPIRED;
|
int8_t state = TSDB_SMA_STAT_EXPIRED;
|
||||||
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
|
for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) {
|
||||||
if (taosHashPut(pItem->expiredWindows, &expiredWindows[i], sizeof(TSKEY), &state, sizeof(state)) != 0) {
|
if (taosHashPut(pItem->expiredWindows, (void *)(expiredWindows + i), sizeof(TSKEY), &state, sizeof(state)) != 0) {
|
||||||
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
|
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
|
||||||
// tell query module to query raw TS data.
|
// tell query module to query raw TS data.
|
||||||
// N.B.
|
// N.B.
|
||||||
|
@ -368,37 +389,43 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
|
||||||
taosHashCleanup(pItem->expiredWindows);
|
taosHashCleanup(pItem->expiredWindows);
|
||||||
tfree(pItem->pSma);
|
tfree(pItem->pSma);
|
||||||
taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
|
taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
|
||||||
|
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
tsdbDebug("vgId:%d smaIndex %" PRIi64 " tsKey %" PRIi64 " is put to hash", REPO_ID(pTsdb), indexUid,
|
||||||
|
expiredWindows[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||||
SSmaStatItem *pItem2 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
|
|
||||||
int size2 = taosHashGetSize(pItem1->expiredWindows);
|
|
||||||
tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d after hashPut", REPO_ID(pTsdb), indexUid, size2);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
|
static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
|
||||||
SSmaStatItem *pItem = NULL;
|
SSmaStatItem *pItem = NULL;
|
||||||
|
|
||||||
// TODO: If HASH_ENTRY_LOCK used, whether rwlock needed to handle cases of removing hashNode?
|
tsdbRefSmaStat(pTsdb, pStat);
|
||||||
|
|
||||||
if (pStat && pStat->smaStatItems) {
|
if (pStat && pStat->smaStatItems) {
|
||||||
pItem = (SSmaStatItem *)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
|
pItem = *(SSmaStatItem **)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
|
||||||
}
|
}
|
||||||
#if 0
|
|
||||||
if (pItem != NULL) {
|
if (pItem != NULL) {
|
||||||
// TODO: reset time window for the sma data blocks
|
// pItem resides in hash buffer all the time unless drop sma index
|
||||||
|
// TODO: multithread protect
|
||||||
if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
|
if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
|
||||||
// error handling
|
// error handling
|
||||||
|
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||||
|
tsdbWarn("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " failed", REPO_ID(pTsdb),
|
||||||
|
skey, indexUid);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// error handling
|
// error handling
|
||||||
|
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||||
|
tsdbWarn("vgId:%d expired window %" PRIi64 " not exists for sma index %" PRIi64, REPO_ID(pTsdb), skey, indexUid);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -705,7 +732,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
||||||
// TODO:tsdbEndTSmaCommit();
|
// TODO:tsdbEndTSmaCommit();
|
||||||
|
|
||||||
// Step 3: reset the SSmaStat
|
// Step 3: reset the SSmaStat
|
||||||
tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
|
tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
|
||||||
|
|
||||||
tsdbDestroyTSmaWriteH(&tSmaH);
|
tsdbDestroyTSmaWriteH(&tSmaH);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -751,7 +778,7 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
|
||||||
// TODO:tsdbEndTSmaCommit();
|
// TODO:tsdbEndTSmaCommit();
|
||||||
|
|
||||||
// reset the SSmaStat
|
// reset the SSmaStat
|
||||||
tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey);
|
tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -839,7 +866,7 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
|
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
|
||||||
int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey,
|
int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey,
|
||||||
int32_t nMaxResult) {
|
int32_t nMaxResult) {
|
||||||
if (!pTsdb->pTSmaEnv) {
|
if (!pTsdb->pTSmaEnv) {
|
||||||
terrno = TSDB_CODE_INVALID_PTR;
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
|
@ -847,12 +874,14 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid));
|
tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
|
||||||
|
SSmaStatItem *pItem = *(SSmaStatItem **)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid));
|
||||||
if (pItem == NULL) {
|
if (pItem == NULL) {
|
||||||
// Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
|
// Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
|
||||||
// it's NULL.
|
// it's NULL.
|
||||||
|
tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
|
||||||
terrno = TSDB_CODE_TDB_INVALID_ACTION;
|
terrno = TSDB_CODE_TDB_INVALID_ACTION;
|
||||||
tsdbWarn("vgId:%d getTSmaDataImpl failed since no index %" PRIi64 " in local cache", REPO_ID(pTsdb), indexUid);
|
tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64, REPO_ID(pTsdb), indexUid);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -866,16 +895,23 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if 0
|
#if 1
|
||||||
if (taosHashGet(pItem->expiredWindows, &querySkey, sizeof(TSKEY)) != NULL) {
|
if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY)) != NULL) {
|
||||||
// TODO: mark this window as expired.
|
// TODO: mark this window as expired.
|
||||||
|
tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb),
|
||||||
|
querySKey, indexUid);
|
||||||
|
} else {
|
||||||
|
tsdbDebug("vgId:%d skey %" PRIi64 " of window not in expired window for index %" PRIi64, REPO_ID(pTsdb), querySKey,
|
||||||
|
indexUid);
|
||||||
}
|
}
|
||||||
|
tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
STSmaReadH tReadH = {0};
|
STSmaReadH tReadH = {0};
|
||||||
tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit);
|
tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit);
|
||||||
tsdbCloseDBF(&tReadH.dFile);
|
tsdbCloseDBF(&tReadH.dFile);
|
||||||
|
|
||||||
tsdbInitTSmaFile(&tReadH, querySkey);
|
tsdbInitTSmaFile(&tReadH, querySKey);
|
||||||
if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) {
|
if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) {
|
||||||
tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno));
|
tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -883,7 +919,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
|
||||||
|
|
||||||
char smaKey[SMA_KEY_LEN] = {0};
|
char smaKey[SMA_KEY_LEN] = {0};
|
||||||
void *pSmaKey = &smaKey;
|
void *pSmaKey = &smaKey;
|
||||||
tsdbEncodeTSmaKey(tableUid, colId, querySkey, (void **)&pSmaKey);
|
tsdbEncodeTSmaKey(tableUid, colId, querySKey, (void **)&pSmaKey);
|
||||||
|
|
||||||
tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb),
|
tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb),
|
||||||
tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
|
tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
|
||||||
|
@ -1010,9 +1046,9 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit,
|
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit,
|
||||||
tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, int32_t nMaxResult) {
|
tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, interval, intervalUnit, tableUid, colId, querySkey,
|
if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, interval, intervalUnit, tableUid, colId, querySKey,
|
||||||
nMaxResult)) < 0) {
|
nMaxResult)) < 0) {
|
||||||
tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
|
tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
|
||||||
}
|
}
|
||||||
|
|
|
@ -301,7 +301,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *msg = (char *)calloc(100, 1);
|
char *msg = (char *)calloc(1, 100);
|
||||||
assert(msg != NULL);
|
assert(msg != NULL);
|
||||||
ASSERT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
|
ASSERT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
|
||||||
|
|
||||||
|
|
|
@ -158,9 +158,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||||
} while (0)
|
} while (0)
|
||||||
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false)
|
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false)
|
||||||
|
|
||||||
|
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
||||||
|
|
||||||
static void* cliWorkThread(void* arg);
|
static void* cliWorkThread(void* arg);
|
||||||
|
|
||||||
bool cliMayContinueSendMsg(SCliConn* conn) {
|
bool cliMaySendCachedMsg(SCliConn* conn) {
|
||||||
if (taosArrayGetSize(conn->cliMsgs) > 0) {
|
if (taosArrayGetSize(conn->cliMsgs) > 0) {
|
||||||
cliSend(conn);
|
cliSend(conn);
|
||||||
return true;
|
return true;
|
||||||
|
@ -226,7 +228,7 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
}
|
}
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
|
|
||||||
if (cliMayContinueSendMsg(conn) == true) {
|
if (cliMaySendCachedMsg(conn) == true) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -441,7 +443,25 @@ static void cliDestroy(uv_handle_t* handle) {
|
||||||
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||||
free(conn);
|
free(conn);
|
||||||
}
|
}
|
||||||
|
static bool cliHandleNoResp(SCliConn* conn) {
|
||||||
|
bool res = false;
|
||||||
|
SArray* msgs = conn->cliMsgs;
|
||||||
|
if (taosArrayGetSize(msgs) > 0) {
|
||||||
|
SCliMsg* pMsg = taosArrayGetP(msgs, 0);
|
||||||
|
if (REQUEST_NO_RESP(&pMsg->msg)) {
|
||||||
|
taosArrayRemove(msgs, 0);
|
||||||
|
destroyCmsg(pMsg);
|
||||||
|
res = true;
|
||||||
|
}
|
||||||
|
if (res == true) {
|
||||||
|
if (cliMaySendCachedMsg(conn) == false) {
|
||||||
|
SCliThrdObj* thrd = conn->hostThrd;
|
||||||
|
addConnToPool(thrd->pool, conn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
static void cliSendCb(uv_write_t* req, int status) {
|
static void cliSendCb(uv_write_t* req, int status) {
|
||||||
SCliConn* pConn = req->data;
|
SCliConn* pConn = req->data;
|
||||||
|
|
||||||
|
@ -452,6 +472,10 @@ static void cliSendCb(uv_write_t* req, int status) {
|
||||||
cliHandleExcept(pConn);
|
cliHandleExcept(pConn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (cliHandleNoResp(pConn) == true) {
|
||||||
|
tTrace("%s cli conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
|
return;
|
||||||
|
}
|
||||||
uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
|
uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -489,6 +513,7 @@ void cliSend(SCliConn* pConn) {
|
||||||
msgLen += sizeof(STransUserMsg);
|
msgLen += sizeof(STransUserMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pHead->resflag = REQUEST_NO_RESP(pMsg) ? 1 : 0;
|
||||||
pHead->msgType = pMsg->msgType;
|
pHead->msgType = pMsg->msgType;
|
||||||
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
|
||||||
|
|
||||||
|
|
|
@ -226,15 +226,22 @@ static void uvHandleReq(SSrvConn* pConn) {
|
||||||
transMsg.msgType = pHead->msgType;
|
transMsg.msgType = pHead->msgType;
|
||||||
transMsg.code = pHead->code;
|
transMsg.code = pHead->code;
|
||||||
transMsg.ahandle = NULL;
|
transMsg.ahandle = NULL;
|
||||||
transMsg.handle = pConn;
|
transMsg.handle = NULL;
|
||||||
|
|
||||||
transClearBuffer(&pConn->readBuf);
|
transClearBuffer(&pConn->readBuf);
|
||||||
pConn->inType = pHead->msgType;
|
pConn->inType = pHead->msgType;
|
||||||
transRefSrvHandle(pConn);
|
|
||||||
|
|
||||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
|
if (pHead->resflag == 0) {
|
||||||
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
|
transRefSrvHandle(pConn);
|
||||||
ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
transMsg.handle = pConn;
|
||||||
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
|
||||||
|
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
|
||||||
|
ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
||||||
|
} else {
|
||||||
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn,
|
||||||
|
TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||||
|
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
||||||
|
}
|
||||||
|
|
||||||
STrans* pTransInst = (STrans*)p->shandle;
|
STrans* pTransInst = (STrans*)p->shandle;
|
||||||
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
||||||
|
|
|
@ -361,7 +361,7 @@ TEST_F(TransEnv, cliPersistHandle) {
|
||||||
tr->SetCliPersistFp(cliPersistHandle);
|
tr->SetCliPersistFp(cliPersistHandle);
|
||||||
SRpcMsg resp = {0};
|
SRpcMsg resp = {0};
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
SRpcMsg req = {.handle = resp.handle};
|
SRpcMsg req = {.handle = resp.handle, .noResp = 0};
|
||||||
req.msgType = 1;
|
req.msgType = 1;
|
||||||
req.pCont = rpcMallocCont(10);
|
req.pCont = rpcMallocCont(10);
|
||||||
req.contLen = 10;
|
req.contLen = 10;
|
||||||
|
@ -448,6 +448,25 @@ TEST_F(TransEnv, srvPersistHandleExcept) {
|
||||||
// conn broken
|
// conn broken
|
||||||
//
|
//
|
||||||
}
|
}
|
||||||
|
TEST_F(TransEnv, cliPersistHandleExcept) {
|
||||||
|
tr->SetSrvContinueSend(processContinueSend);
|
||||||
|
tr->SetCliPersistFp(cliPersistHandle);
|
||||||
|
SRpcMsg resp = {0};
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
SRpcMsg req = {.handle = resp.handle};
|
||||||
|
req.msgType = 1;
|
||||||
|
req.pCont = rpcMallocCont(10);
|
||||||
|
req.contLen = 10;
|
||||||
|
tr->cliSendAndRecv(&req, &resp);
|
||||||
|
if (i > 2) {
|
||||||
|
tr->StopSrv();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosMsleep(2000);
|
||||||
|
// conn broken
|
||||||
|
//
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(TransEnv, multiCliPersistHandleExcept) {
|
TEST_F(TransEnv, multiCliPersistHandleExcept) {
|
||||||
// conn broken
|
// conn broken
|
||||||
|
@ -458,5 +477,15 @@ TEST_F(TransEnv, queryExcept) {
|
||||||
// query and conn is broken
|
// query and conn is broken
|
||||||
}
|
}
|
||||||
TEST_F(TransEnv, noResp) {
|
TEST_F(TransEnv, noResp) {
|
||||||
|
SRpcMsg resp = {0};
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
SRpcMsg req = {.noResp = 1};
|
||||||
|
req.msgType = 1;
|
||||||
|
req.pCont = rpcMallocCont(10);
|
||||||
|
req.contLen = 10;
|
||||||
|
tr->cliSendAndRecv(&req, &resp);
|
||||||
|
}
|
||||||
|
taosMsleep(2000);
|
||||||
|
|
||||||
// no resp
|
// no resp
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue