feat(query): add tail function
This commit is contained in:
parent
3ef067ff03
commit
4be158b391
|
@ -884,7 +884,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getTailFuncEnv,
|
||||
.initFunc = tailFunctionSetup,
|
||||
.processFunc = tailFunction,
|
||||
.finalizeFunc = NULL
|
||||
.finalizeFunc = tailFinalize
|
||||
},
|
||||
{
|
||||
.name = "abs",
|
||||
|
|
|
@ -164,10 +164,10 @@ typedef struct SSampleInfo {
|
|||
int64_t *timestamp;
|
||||
} SSampleInfo;
|
||||
|
||||
typedef struct STailUnit {
|
||||
typedef struct STailItem {
|
||||
int64_t timestamp;
|
||||
char data[];
|
||||
} STailUnit;
|
||||
} STailItem;
|
||||
|
||||
typedef struct STailInfo {
|
||||
int32_t numOfPoints;
|
||||
|
@ -175,7 +175,7 @@ typedef struct STailInfo {
|
|||
int32_t offset;
|
||||
uint8_t colType;
|
||||
int16_t colBytes;
|
||||
STailUnit **pRes;
|
||||
STailItem **pItems;
|
||||
} STailInfo;
|
||||
|
||||
#define SET_VAL(_info, numOfElem, res) \
|
||||
|
@ -3164,7 +3164,7 @@ bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
|||
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
|
||||
int32_t numOfPoints = pVal->datum.i;
|
||||
pEnv->calcMemSize = sizeof(STailInfo) + numOfPoints * (POINTER_BYTES + sizeof(STailUnit) + pCol->node.resType.bytes);
|
||||
pEnv->calcMemSize = sizeof(STailInfo) + numOfPoints * (POINTER_BYTES + sizeof(STailItem) + pCol->node.resType.bytes);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -3184,36 +3184,37 @@ bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) {
|
|||
return false;
|
||||
}
|
||||
|
||||
pInfo->pRes = (STailUnit **)((char *)pInfo + sizeof(STailInfo));
|
||||
char *pUnit = (char *)pInfo->pRes + pInfo->numOfPoints * POINTER_BYTES;
|
||||
pInfo->pItems = (STailItem **)((char *)pInfo + sizeof(STailInfo));
|
||||
char *pItem = (char *)pInfo->pItems + pInfo->numOfPoints * POINTER_BYTES;
|
||||
|
||||
size_t unitSize = sizeof(STailUnit) + pInfo->colBytes;
|
||||
size_t unitSize = sizeof(STailItem) + pInfo->colBytes;
|
||||
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
||||
pInfo->pRes[i] = (STailUnit *)(pUnit + i * unitSize);
|
||||
pInfo->pItems[i] = (STailItem *)(pItem + i * unitSize);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static void tailAssignResult(STailUnit* pUnit, char *data, int32_t colBytes, TSKEY ts) {
|
||||
pUnit->timestamp = ts;
|
||||
memcpy(pUnit->data, data, colBytes);
|
||||
static void tailAssignResult(STailItem* pItem, char *data, int32_t colBytes, TSKEY ts) {
|
||||
pItem->timestamp = ts;
|
||||
memcpy(pItem->data, data, colBytes);
|
||||
}
|
||||
|
||||
static int32_t tailCompFn(const void *p1, const void *p2, const void *param) {
|
||||
STailUnit *d1 = *(STailUnit **)p1;
|
||||
STailUnit *d2 = *(STailUnit **)p2;
|
||||
STailItem *d1 = *(STailItem **)p1;
|
||||
STailItem *d2 = *(STailItem **)p2;
|
||||
return compareInt64Val(&d1->timestamp, &d2->timestamp);
|
||||
}
|
||||
|
||||
static void doTailAdd(STailInfo* pInfo, char *data, TSKEY ts) {
|
||||
STailUnit **pList = pInfo->pRes;
|
||||
STailItem **pList = pInfo->pItems;
|
||||
if (pInfo->numAdded < pInfo->numOfPoints) {
|
||||
tailAssignResult(pList[pInfo->numAdded], data, pInfo->colBytes, ts);
|
||||
taosheapsort((void *)pList, sizeof(STailUnit **), pInfo->numAdded + 1, NULL, tailCompFn, 0);
|
||||
taosheapsort((void *)pList, sizeof(STailItem **), pInfo->numAdded + 1, NULL, tailCompFn, 0);
|
||||
pInfo->numAdded++;
|
||||
} else if (pList[0]->timestamp < ts) {
|
||||
tailAssignResult(pList[0], data, pInfo->colBytes, ts);
|
||||
taosheapadjust((void *)pList, sizeof(STailUnit **), 0, pInfo->numOfPoints - 1, NULL, tailCompFn, NULL, 0);
|
||||
taosheapadjust((void *)pList, sizeof(STailItem **), 0, pInfo->numOfPoints - 1, NULL, tailCompFn, NULL, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3231,7 +3232,7 @@ int32_t tailFunction(SqlFunctionCtx* pCtx) {
|
|||
int32_t startOffset = pCtx->offset;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
//colDataAppendNULL(pOutput, i);
|
||||
colDataAppendNULL(pOutput, i);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -3239,5 +3240,34 @@ int32_t tailFunction(SqlFunctionCtx* pCtx) {
|
|||
doTailAdd(pInfo, data, tsList[i]);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
|
||||
int32_t pos = startOffset + i;
|
||||
STailItem *pItem = pInfo->pItems[i];
|
||||
colDataAppend(pOutput, pos, pItem->data, false);
|
||||
}
|
||||
|
||||
return pInfo->numOfPoints;
|
||||
}
|
||||
|
||||
int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
STailInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
pEntryInfo->complete = true;
|
||||
|
||||
int32_t type = pCtx->input.pData[0]->info.type;
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
// todo assign the tag value and the corresponding row data
|
||||
int32_t currentRow = pBlock->info.rows;
|
||||
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
|
||||
STailItem *pItem = pInfo->pItems[i];
|
||||
colDataAppend(pCol, currentRow, pItem->data, false);
|
||||
|
||||
//setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow);
|
||||
currentRow += 1;
|
||||
}
|
||||
|
||||
return pEntryInfo->numOfRes;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue