| 1 |
CREATE OR REPLACE FUNCTION stratcon.rollup_metric_numeric(in_roll text) RETURNS int AS $$ |
|---|
| 2 |
DECLARE |
|---|
| 3 |
v_rec stratcon.metric_numeric_rollup_segment%rowtype; |
|---|
| 4 |
v_segment stratcon.metric_numeric_rollup%rowtype; |
|---|
| 5 |
v_conf RECORD; |
|---|
| 6 |
v_sql TEXT; |
|---|
| 7 |
v_current_whence TIMESTAMPTZ; |
|---|
| 8 |
v_min_whence TIMESTAMPTZ; |
|---|
| 9 |
v_whence TIMESTAMPTZ; |
|---|
| 10 |
v_taskid INTEGER; |
|---|
| 11 |
v_locked BOOLEAN; |
|---|
| 12 |
v_this_roll TEXT; |
|---|
| 13 |
v_stored_rollup INTEGER; |
|---|
| 14 |
v_stored_rollup_tm TIMESTAMPTZ; |
|---|
| 15 |
v_offset INTEGER; |
|---|
| 16 |
v_init BOOLEAN := FALSE; |
|---|
| 17 |
v_i SMALLINT; |
|---|
| 18 |
v_temprec RECORD; |
|---|
| 19 |
v_count INTEGER; |
|---|
| 20 |
BEGIN |
|---|
| 21 |
|
|---|
| 22 |
-- Get rollup config based on given name, and fail if its wrong name. |
|---|
| 23 |
SELECT * FROM metric_numeric_rollup_config WHERE rollup = in_roll INTO v_conf; |
|---|
| 24 |
IF NOT FOUND THEN |
|---|
| 25 |
raise exception 'Given rollup name is invalid! [%]', in_roll; |
|---|
| 26 |
END IF; |
|---|
| 27 |
|
|---|
| 28 |
-- Get task id - used for locking - based on given roll name |
|---|
| 29 |
v_this_roll := 'rollup_metric_numeric_'||in_roll; |
|---|
| 30 |
SELECT id FROM tasklock WHERE "name" = v_this_roll INTO v_taskid; |
|---|
| 31 |
IF v_taskid IS NULL THEN |
|---|
| 32 |
INSERT INTO tasklock (id, "name") VALUES (nextval('tasklock_id_seq'), v_this_roll) RETURNING id into v_taskid; |
|---|
| 33 |
END IF; |
|---|
| 34 |
|
|---|
| 35 |
-- Try to lock task_id - to make sure only one stratcon.rollup_metric_numeric_generic() runs at a time for this particular in_roll. |
|---|
| 36 |
SELECT pg_try_advisory_lock(43191, v_taskid) INTO v_locked; |
|---|
| 37 |
IF v_locked = false THEN |
|---|
| 38 |
RAISE NOTICE 'rollup for metric numeric (%) already running', in_roll; |
|---|
| 39 |
RETURN 0; |
|---|
| 40 |
END IF; |
|---|
| 41 |
|
|---|
| 42 |
v_current_whence := 'epoch'::timestamptz + '1 second'::INTERVAL * v_conf.seconds * floor(extract( epoch FROM now() ) / v_conf.seconds); |
|---|
| 43 |
|
|---|
| 44 |
LOOP |
|---|
| 45 |
IF v_i > 12 THEN |
|---|
| 46 |
perform pg_advisory_unlock(43191, v_taskid); |
|---|
| 47 |
RETURN 1; |
|---|
| 48 |
END IF; |
|---|
| 49 |
|
|---|
| 50 |
SELECT MIN(whence) FROM metric_numeric_rollup_queue |
|---|
| 51 |
WHERE "interval" = in_roll and whence < v_current_whence INTO v_min_whence; |
|---|
| 52 |
EXIT WHEN NOT FOUND OR v_min_whence IS NULL; |
|---|
| 53 |
|
|---|
| 54 |
-- now() in following query is just a placeholder to get named field (use_whence) in temprec. |
|---|
| 55 |
FOR v_temprec IN SELECT *, v_min_whence as use_whence |
|---|
| 56 |
FROM noit.metric_numeric_rollup_config WHERE dependent_on = in_roll |
|---|
| 57 |
LOOP |
|---|
| 58 |
-- Following formula gives equivalent of date_trunc(..) but working on basically any unit - like "10 minutes" |
|---|
| 59 |
-- The unit has to be given in seconds, AND provided as v_temprec.seconds |
|---|
| 60 |
v_temprec.use_whence := 'epoch'::timestamptz + '1 second'::INTERVAL * v_temprec.seconds * floor(extract( epoch FROM v_temprec.use_whence ) / v_temprec.seconds); |
|---|
| 61 |
|
|---|
| 62 |
RAISE NOTICE '(%,%)',v_temprec.rollup, v_temprec.use_whence; |
|---|
| 63 |
-- Poor mans UPSERT :) |
|---|
| 64 |
INSERT INTO metric_numeric_rollup_queue ("interval", whence) |
|---|
| 65 |
SELECT v_temprec.rollup, v_temprec.use_whence |
|---|
| 66 |
WHERE NOT EXISTS ( |
|---|
| 67 |
SELECT * FROM metric_numeric_rollup_queue WHERE ( "interval", whence ) = ( v_temprec.rollup, v_temprec.use_whence ) |
|---|
| 68 |
); |
|---|
| 69 |
END LOOP; |
|---|
| 70 |
|
|---|
| 71 |
IF in_roll = '5m' THEN |
|---|
| 72 |
v_sql := 'SELECT * FROM stratcon.window_robust_derive('||quote_literal(v_min_whence)||')'; |
|---|
| 73 |
ELSE |
|---|
| 74 |
v_sql := 'SELECT sid, name, '||quote_literal(v_min_whence)||' as rollup_time, '; |
|---|
| 75 |
v_sql := v_sql || ' SUM(coalesce(count_rows, 0)) as count_rows, '; |
|---|
| 76 |
v_sql := v_sql || ' (SUM(avg_value*coalesce(count_rows,0))/SUM(coalesce(count_rows, 0))) as avg_value,'; |
|---|
| 77 |
v_sql := v_sql || ' (SUM(counter_dev*coalesce(count_rows,0))/SUM(coalesce(count_rows, 0))) as counter_dev '; |
|---|
| 78 |
v_sql := v_sql || ' FROM stratcon.unroll_metric_numeric('||quote_literal(v_min_whence)||','; |
|---|
| 79 |
v_sql := v_sql || quote_literal(v_min_whence + (v_conf.seconds - 1) * '1 second'::interval) || ',' || quote_literal(v_conf.dependent_on) ||')'; |
|---|
| 80 |
v_sql := v_sql || ' GROUP BY sid, name'; |
|---|
| 81 |
END IF; |
|---|
| 82 |
RAISE NOTICE 'v_sql was (%)',v_sql; |
|---|
| 83 |
|
|---|
| 84 |
FOR v_rec IN EXECUTE v_sql LOOP |
|---|
| 85 |
v_stored_rollup := floor( extract('epoch' from v_rec.rollup_time) / v_conf.span ) * v_conf.span; |
|---|
| 86 |
v_stored_rollup_tm := 'epoch'::timestamptz + v_stored_rollup * '1 second'::interval; |
|---|
| 87 |
v_offset := floor( ( extract('epoch' from v_rec.rollup_time) - v_stored_rollup) / v_conf.seconds ); |
|---|
| 88 |
|
|---|
| 89 |
v_sql := 'SELECT * FROM metric_numeric_rollup_'||in_roll||' WHERE rollup_time = '||quote_literal(v_stored_rollup_tm); |
|---|
| 90 |
v_sql := v_sql ||' and sid='||v_rec.sid||' and name = '|| quote_literal(v_rec.name); |
|---|
| 91 |
|
|---|
| 92 |
EXECUTE v_sql INTO v_segment; |
|---|
| 93 |
GET DIAGNOSTICS v_count = ROW_COUNT; |
|---|
| 94 |
IF v_count = 0 THEN |
|---|
| 95 |
v_segment := stratcon.init_metric_numeric_rollup( in_roll ); |
|---|
| 96 |
v_init := true; |
|---|
| 97 |
RAISE NOTICE 'didnt find sid %, name %, rollup_time %, offset %', v_rec.sid, v_rec.name, v_stored_rollup_tm, v_offset; |
|---|
| 98 |
END IF; |
|---|
| 99 |
|
|---|
| 100 |
v_segment.sid := v_rec.sid; |
|---|
| 101 |
v_segment.name := v_rec.name; |
|---|
| 102 |
v_segment.count_rows[v_offset] := v_rec.count_rows; |
|---|
| 103 |
v_segment.avg_value[v_offset] := v_rec.avg_value; |
|---|
| 104 |
v_segment.counter_dev[v_offset] := v_rec.counter_dev; |
|---|
| 105 |
|
|---|
| 106 |
IF v_init THEN |
|---|
| 107 |
v_sql := 'INSERT INTO metric_numeric_rollup_'||in_roll||' (sid,name,rollup_time,count_rows,avg_value,counter_dev) |
|---|
| 108 |
VALUES ($1,$2,$3,$4,$5,$6)'; |
|---|
| 109 |
EXECUTE v_sql USING v_segment.sid, v_segment.name, v_stored_rollup_tm, v_segment.count_rows, v_segment.avg_value, v_segment.counter_dev; |
|---|
| 110 |
v_init := false; |
|---|
| 111 |
ELSE |
|---|
| 112 |
v_sql := 'UPDATE metric_numeric_rollup_'||in_roll; |
|---|
| 113 |
v_sql := v_sql || ' SET (count_rows,avg_value,counter_dev) = ($1,$2,$3)'; |
|---|
| 114 |
v_sql := v_sql || ' WHERE rollup_time = $4 AND sid = $5 AND name = $6'; |
|---|
| 115 |
EXECUTE v_sql USING v_segment.count_rows, v_segment.avg_value, v_segment.counter_dev, v_stored_rollup_tm, v_segment.sid, v_segment.name; |
|---|
| 116 |
END IF; |
|---|
| 117 |
|
|---|
| 118 |
v_i := v_i + 1; |
|---|
| 119 |
END LOOP; |
|---|
| 120 |
|
|---|
| 121 |
-- Delete from whence log table |
|---|
| 122 |
DELETE FROM metric_numeric_rollup_queue WHERE whence=v_min_whence AND "interval"=in_roll; |
|---|
| 123 |
|
|---|
| 124 |
v_min_whence := NULL; |
|---|
| 125 |
END LOOP; |
|---|
| 126 |
|
|---|
| 127 |
perform pg_advisory_unlock(43191, v_taskid); |
|---|
| 128 |
|
|---|
| 129 |
RETURN 0; |
|---|
| 130 |
|
|---|
| 131 |
EXCEPTION |
|---|
| 132 |
WHEN RAISE_EXCEPTION THEN |
|---|
| 133 |
perform pg_advisory_unlock(43191, v_taskid); |
|---|
| 134 |
RAISE EXCEPTION '%', SQLERRM; |
|---|
| 135 |
WHEN OTHERS THEN |
|---|
| 136 |
perform pg_advisory_unlock(43191, v_taskid); |
|---|
| 137 |
RAISE NOTICE '%', SQLERRM; |
|---|
| 138 |
END |
|---|
| 139 |
$$ LANGUAGE plpgsql |
|---|
| 140 |
SECURITY DEFINER |
|---|
| 141 |
; |
|---|