feat(query): add sample function
This commit is contained in:
parent
d24254fb84
commit
315a875208
|
@ -155,6 +155,7 @@ typedef struct SSampleInfo {
|
||||||
int32_t samples;
|
int32_t samples;
|
||||||
int32_t totalPoints;
|
int32_t totalPoints;
|
||||||
int32_t numSampled;
|
int32_t numSampled;
|
||||||
|
uint8_t colType;
|
||||||
int16_t colBytes;
|
int16_t colBytes;
|
||||||
char *data;
|
char *data;
|
||||||
int64_t *timestamp;
|
int64_t *timestamp;
|
||||||
|
@ -3063,7 +3064,8 @@ bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo)
|
||||||
pInfo->samples = pCtx->param[1].param.i;
|
pInfo->samples = pCtx->param[1].param.i;
|
||||||
pInfo->totalPoints = 0;
|
pInfo->totalPoints = 0;
|
||||||
pInfo->numSampled = 0;
|
pInfo->numSampled = 0;
|
||||||
pInfo->colBytes = ((SColumnInfoData*)pCtx->pOutput)->info.bytes;
|
pInfo->colType = pCtx->resDataInfo.type;
|
||||||
|
pInfo->colBytes = pCtx->resDataInfo.bytes;
|
||||||
if (pInfo->samples < 1 || pInfo->samples > SAMPLE_MAX_POINTS_NUM) {
|
if (pInfo->samples < 1 || pInfo->samples > SAMPLE_MAX_POINTS_NUM) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -3073,22 +3075,20 @@ bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void sampleAssignResult(SColumnInfoData *pOutput, SSampleInfo* pInfo,
|
static void sampleAssignResult(SSampleInfo* pInfo, char *data, TSKEY ts, int32_t index) {
|
||||||
char *data, TSKEY ts, int32_t index) {
|
assignVal(pInfo->data + index * pInfo->colBytes, data, pInfo->colBytes, pInfo->colType);
|
||||||
assignVal(pInfo->data + index * pInfo->colBytes, data, pOutput->info.bytes, pOutput->info.type);
|
|
||||||
*(pInfo->timestamp + index) = ts;
|
*(pInfo->timestamp + index) = ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doReservoirSample(SColumnInfoData *pOutput, SSampleInfo* pInfo,
|
static void doReservoirSample(SSampleInfo* pInfo, char *data, TSKEY ts, int32_t index) {
|
||||||
char *data, TSKEY ts, int32_t index) {
|
|
||||||
pInfo->totalPoints++;
|
pInfo->totalPoints++;
|
||||||
if (pInfo->numSampled < pInfo->samples) {
|
if (pInfo->numSampled < pInfo->samples) {
|
||||||
sampleAssignResult(pOutput, pInfo, data, ts, pInfo->numSampled);
|
sampleAssignResult(pInfo, data, ts, pInfo->numSampled);
|
||||||
pInfo->numSampled++;
|
pInfo->numSampled++;
|
||||||
} else {
|
} else {
|
||||||
int32_t j = taosRand() % (pInfo->totalPoints);
|
int32_t j = taosRand() % (pInfo->totalPoints);
|
||||||
if (j < pInfo->samples) {
|
if (j < pInfo->samples) {
|
||||||
sampleAssignResult(pOutput, pInfo, data, ts, j);
|
sampleAssignResult(pInfo, data, ts, j);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3113,7 +3113,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
doReservoirSample(pOutput, pInfo, data, tsList[i], i);
|
doReservoirSample(pInfo, data, tsList[i], i);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pInfo->numSampled;
|
return pInfo->numSampled;
|
||||||
|
|
Loading…
Reference in New Issue