| 1 |
CREATE OR REPLACE FUNCTION stratcon.rollup_metric_numeric_generic(in_roll text) RETURNS int AS $$ |
|---|
| 2 |
DECLARE |
|---|
| 3 |
v_rec stratcon.metric_numeric_rollup_segment%rowtype; |
|---|
| 4 |
v_segment stratcon.metric_numeric_rollup_segment%rowtype; |
|---|
| 5 |
v_conf RECORD; |
|---|
| 6 |
v_sql TEXT; |
|---|
| 7 |
v_min_whence TIMESTAMPTZ; |
|---|
| 8 |
v_max_rollup TIMESTAMPTZ; |
|---|
| 9 |
v_whence TIMESTAMPTZ; |
|---|
| 10 |
v_taskid INT; |
|---|
| 11 |
v_locked BOOLEAN; |
|---|
| 12 |
v_this_roll TEXT; |
|---|
| 13 |
v_stored_rollup TIMESTAMPTZ; |
|---|
| 14 |
v_offset INTEGER; |
|---|
| 15 |
v_init BOOLEAN := FALSE; |
|---|
| 16 |
v_i SMALLINT; |
|---|
| 17 |
v_temprec RECORD; |
|---|
| 18 |
BEGIN |
|---|
| 19 |
-- Get rollup config based on given name, and fail if its wrong name. |
|---|
| 20 |
SELECT * FROM stratcon.metric_numeric_rollup_config WHERE rollup = in_roll INTO v_conf; |
|---|
| 21 |
IF NOT FOUND THEN |
|---|
| 22 |
raise exception 'Given rollup name is invalid! [%]', in_roll; |
|---|
| 23 |
END IF; |
|---|
| 24 |
|
|---|
| 25 |
-- Get task id - used for locking - based on given roll name |
|---|
| 26 |
v_this_roll := 'rollup_metric_numeric_'||in_roll; |
|---|
| 27 |
SELECT id FROM tasklock WHERE "name" = v_this_roll INTO v_taskid; |
|---|
| 28 |
IF v_taskid IS NULL THEN |
|---|
| 29 |
INSERT INTO tasklock (id, "name") VALUES (nextval('tasklock_id_seq'), v_this_roll) RETURNING id into v_taskid; |
|---|
| 30 |
END IF; |
|---|
| 31 |
|
|---|
| 32 |
-- Try to lock task_id - to make sure only one stratcon.rollup_metric_numeric_generic() runs at a time for this particular in_roll. |
|---|
| 33 |
SELECT pg_try_advisory_lock(43191, v_taskid) INTO v_locked; |
|---|
| 34 |
IF v_locked = false THEN |
|---|
| 35 |
RAISE NOTICE 'rollup for metric numeric (%) already running', in_roll; |
|---|
| 36 |
RETURN 0; |
|---|
| 37 |
END IF; |
|---|
| 38 |
|
|---|
| 39 |
LOOP |
|---|
| 40 |
IF v_i > 10 THEN |
|---|
| 41 |
RETURN 1; |
|---|
| 42 |
END IF; |
|---|
| 43 |
|
|---|
| 44 |
SELECT MIN(whence) FROM metric_numeric_rollup_queue WHERE "interval" = in_roll; |
|---|
| 45 |
EXIT WHEN NOT FOUND; |
|---|
| 46 |
|
|---|
| 47 |
v_sql := 'SELECT MAX(rollup_time) FROM metric_numeric_rollup_' || in_roll; |
|---|
| 48 |
EXECUTE v_sql INTO v_max_rollup; |
|---|
| 49 |
|
|---|
| 50 |
IF v_min_whence <= v_max_rollup THEN |
|---|
| 51 |
v_sql := 'DELETE FROM metric_numeric_rollup_'||in_roll||' WHERE rollup_time = '||quote_literal(v_min_whence); |
|---|
| 52 |
EXECUTE v_sql; |
|---|
| 53 |
END IF; |
|---|
| 54 |
|
|---|
| 55 |
-- now() in following query is just a placeholder to get named field (use_whence) in temprec. |
|---|
| 56 |
FOR v_temprec IN SELECT *, now() as use_whence FROM noit.metric_numeric_rollup_config WHERE dependent_on = in_roll LOOP |
|---|
| 57 |
-- Following formula gives equivalent of date_trunc(..) but working on basically any unit - like "10 minutes" |
|---|
| 58 |
-- The unit has to be given in seconds, AND provided as v_temprec.seconds |
|---|
| 59 |
v_temprec.use_whence := 'epoch'::timestamptz + '1 second'::INTERVAL * v_temprec.seconds * floor(extract( epoch FROM now() ) / v_temprec.seconds); |
|---|
| 60 |
|
|---|
| 61 |
-- Poor mans UPSERT :) |
|---|
| 62 |
INSERT INTO metric_numeric_rollup_queue ("interval", whence) |
|---|
| 63 |
SELECT v_temprec.rollup, v_temprec.use_whence |
|---|
| 64 |
WHERE NOT EXISTS ( |
|---|
| 65 |
SELECT * FROM metric_numeric_rollup_queue WHERE ( "INTERVAL", whence ) = ( v_temprec.rollup, v_temprec.use_whence ) |
|---|
| 66 |
); |
|---|
| 67 |
END LOOP; |
|---|
| 68 |
|
|---|
| 69 |
v_sql := 'SELECT sid, name, $2 as rollup_time, SUM(1) as count_rows, (SUM(avg_value*1)/SUM(1)) as avg_value, (SUM(counter_dev*1)/SUM(1)) as counter_dev |
|---|
| 70 |
FROM stratcon.unroll_metric_numeric( $2, $1, $3) |
|---|
| 71 |
GROUP BY sid, name'; |
|---|
| 72 |
|
|---|
| 73 |
FOR v_rec IN EXECUTE v_sql USING v_min_whence - v_conf.seconds * '1 second'::INTERVAL, v_min_whence, v_conf.dependent_on LOOP |
|---|
| 74 |
v_stored_rollup := floor( extract('epoch' from v_rec.rollup_time) / v_conf.span ) + v_conf.window; |
|---|
| 75 |
v_offset := floor( ( extract('epoch' from v_rec.rollup_time) - v_stored_rollup) / v_conf.seconds ); |
|---|
| 76 |
|
|---|
| 77 |
--v_offset := ( 12*(extract('hour' from v_info.rollup_time))+floor(extract('minute' from v_info.rollup_time)/5) ); |
|---|
| 78 |
--v_stored_rollup := v_info.rollup_time::date; |
|---|
| 79 |
-- RAISE NOTICE 'sid %, name %, rollup_time %, offset %', v_rec.sid, v_rec.name, v_stored_rollup, v_offset; |
|---|
| 80 |
|
|---|
| 81 |
v_sql := 'SELECT * FROM metric_numeric_rollup_'||in_roll||' WHERE rollup_time = '||quote_literal(v_stored_rollup); |
|---|
| 82 |
v_sql := v_sql ||' and sid='||v_rec.sid||' and name = '|| quote_literal(v_rec.name); |
|---|
| 83 |
|
|---|
| 84 |
EXECUTE v_sql INTO v_segment; |
|---|
| 85 |
IF v_segment IS NOT NULL THEN |
|---|
| 86 |
v_segment := stratcon.init_metric_numeric_rollup_segment( in_roll ); |
|---|
| 87 |
v_init := true; |
|---|
| 88 |
RAISE NOTICE 'didnt find sid %, name %, rollup_time %, offset %', v_rec.sid, v_rec.name, v_stored_rollup, v_offset; |
|---|
| 89 |
END IF; |
|---|
| 90 |
|
|---|
| 91 |
v_segment.sid := v_rec.sid; |
|---|
| 92 |
v_segment.name := v_rec.name; |
|---|
| 93 |
v_segment.count_rows[v_offset] := v_rec.count_rows; |
|---|
| 94 |
v_segment.avg_value[v_offset] := v_rec.avg_value; |
|---|
| 95 |
v_segment.counter_dev[v_offset] := v_rec.counter_dev; |
|---|
| 96 |
|
|---|
| 97 |
IF v_init THEN |
|---|
| 98 |
v_sql := 'INSERT INTO metric_numeric_rollup_'||in_roll||' (sid,name,rollup_time,count_rows,avg_value,counter_dev) |
|---|
| 99 |
VALUES ('|| v_segment.sid||','||quote_literal(v_segment.name)||','||quote_literal(v_stored_rollup)||','||v_segment.count_rows |
|---|
| 100 |
||','||v_segment.avg_value||','||v_segment.counter_dev||')'; |
|---|
| 101 |
EXECUTE v_sql; |
|---|
| 102 |
v_init := false; |
|---|
| 103 |
ELSE |
|---|
| 104 |
v_sql := 'UPDATE metric_numeric_rollup_'||in_roll; |
|---|
| 105 |
v_sql := v_sql || 'SET (count_rows,avg_value,counter_dev) = ('||v_rec.count_rows||','||v_rec.avg_value||','||v_rec.counter_dev||')'; |
|---|
| 106 |
v_sql := v_sql || 'WHERE rollup_time = '||v_stored_rollup||' AND sid = '||v_info.sid||' AND name = '||quote_literal(v_info.name); |
|---|
| 107 |
END IF; |
|---|
| 108 |
|
|---|
| 109 |
v_i := v_i + 1; |
|---|
| 110 |
END LOOP; |
|---|
| 111 |
|
|---|
| 112 |
-- Delete from whence log table |
|---|
| 113 |
|
|---|
| 114 |
DELETE FROM metric_numeric_rollup_queue WHERE WHENCE=v_min_whence AND INTERVAL=in_roll; |
|---|
| 115 |
|
|---|
| 116 |
v_min_whence := NULL; |
|---|
| 117 |
v_max_rollup := NULL; |
|---|
| 118 |
|
|---|
| 119 |
END LOOP; |
|---|
| 120 |
|
|---|
| 121 |
perform pg_advisory_unlock(43191, v_taskid); |
|---|
| 122 |
|
|---|
| 123 |
RETURN 0; |
|---|
| 124 |
|
|---|
| 125 |
EXCEPTION |
|---|
| 126 |
WHEN RAISE_EXCEPTION THEN |
|---|
| 127 |
perform pg_advisory_unlock(43191, v_taskid); |
|---|
| 128 |
RAISE EXCEPTION '%', SQLERRM; |
|---|
| 129 |
WHEN OTHERS THEN |
|---|
| 130 |
perform pg_advisory_unlock(43191, v_taskid); |
|---|
| 131 |
RAISE NOTICE '%', SQLERRM; |
|---|
| 132 |
END |
|---|
| 133 |
$$ LANGUAGE plpgsql; |
|---|