root/sql/sprocs/rollup_metric_numeric_generic.sql

Revision fa6feca15943c95436cb86bd016aee5944928a27, 6.2 kB (checked in by Depesz Lubaczewski <depesz@omniti.com>, 5 years ago)

initial commit of reviewed function, with added handling of triggering upper-level rollups, and reading from unroll function

  • Property mode set to 100644
Line 
1 CREATE OR REPLACE FUNCTION stratcon.rollup_metric_numeric_generic(in_roll text) RETURNS int AS $$
2 DECLARE
3     v_rec           stratcon.metric_numeric_rollup_segment%rowtype;
4     v_segment       stratcon.metric_numeric_rollup_segment%rowtype;
5     v_conf          RECORD;
6     v_sql           TEXT;
7     v_min_whence    TIMESTAMPTZ;
8     v_max_rollup    TIMESTAMPTZ;
9     v_whence        TIMESTAMPTZ;
10     v_taskid        INT;
11     v_locked        BOOLEAN;
12     v_this_roll     TEXT;
13     v_stored_rollup TIMESTAMPTZ;
14     v_offset        INTEGER;
15     v_init          BOOLEAN := FALSE;
16     v_i             SMALLINT;
17     v_temprec       RECORD;
18 BEGIN
19     -- Get rollup config based on given name, and fail if its wrong name.
20     SELECT * FROM stratcon.metric_numeric_rollup_config WHERE rollup = in_roll INTO v_conf;
21     IF NOT FOUND THEN
22         raise exception 'Given rollup name is invalid! [%]', in_roll;
23     END IF;
24
25     -- Get task id - used for locking - based on given roll name
26     v_this_roll := 'rollup_metric_numeric_'||in_roll;
27     SELECT id FROM tasklock WHERE "name" = v_this_roll INTO v_taskid;
28     IF v_taskid IS NULL THEN
29         INSERT INTO tasklock (id, "name") VALUES (nextval('tasklock_id_seq'), v_this_roll) RETURNING id into v_taskid;
30     END IF;
31
32     -- Try to lock task_id - to make sure only one stratcon.rollup_metric_numeric_generic() runs at a time for this particular in_roll.
33     SELECT pg_try_advisory_lock(43191, v_taskid) INTO v_locked;
34     IF v_locked = false THEN
35         RAISE NOTICE 'rollup for metric numeric (%) already running', in_roll;
36         RETURN 0;
37     END IF;
38
39     LOOP
40         IF v_i > 10 THEN
41             RETURN 1;
42         END IF;
43
44         SELECT MIN(whence) FROM metric_numeric_rollup_queue WHERE "interval" = in_roll;
45         EXIT WHEN NOT FOUND;
46
47         v_sql := 'SELECT MAX(rollup_time) FROM metric_numeric_rollup_' || in_roll;
48         EXECUTE v_sql INTO v_max_rollup;
49
50         IF v_min_whence <= v_max_rollup THEN
51             v_sql := 'DELETE FROM metric_numeric_rollup_'||in_roll||' WHERE rollup_time = '||quote_literal(v_min_whence);
52             EXECUTE v_sql;
53         END IF;
54
55         -- now() in following query is just a placeholder to get named field (use_whence) in temprec.
56         FOR v_temprec IN SELECT *, now() as use_whence FROM noit.metric_numeric_rollup_config WHERE dependent_on = in_roll LOOP
57             -- Following formula gives equivalent of date_trunc(..) but working on basically any unit - like "10 minutes"
58             -- The unit has to be given in seconds, AND provided as v_temprec.seconds
59             v_temprec.use_whence := 'epoch'::timestamptz + '1 second'::INTERVAL * v_temprec.seconds * floor(extract( epoch FROM now() ) / v_temprec.seconds);
60
61             -- Poor mans UPSERT :)
62             INSERT INTO metric_numeric_rollup_queue ("interval", whence)
63                 SELECT v_temprec.rollup, v_temprec.use_whence
64                 WHERE NOT EXISTS (
65                     SELECT * FROM metric_numeric_rollup_queue WHERE ( "INTERVAL", whence ) = ( v_temprec.rollup, v_temprec.use_whence )
66                 );
67         END LOOP;
68
69         v_sql := 'SELECT sid, name, $2 as rollup_time, SUM(1) as count_rows, (SUM(avg_value*1)/SUM(1)) as avg_value, (SUM(counter_dev*1)/SUM(1)) as counter_dev
70                   FROM  stratcon.unroll_metric_numeric( $2, $1, $3)
71                   GROUP BY sid, name';
72
73         FOR v_rec IN EXECUTE v_sql USING v_min_whence - v_conf.seconds * '1 second'::INTERVAL, v_min_whence, v_conf.dependent_on LOOP
74             v_stored_rollup := floor( extract('epoch' from v_rec.rollup_time) / v_conf.span ) + v_conf.window;
75             v_offset        := floor( ( extract('epoch' from v_rec.rollup_time) - v_stored_rollup) / v_conf.seconds );
76
77             --v_offset := ( 12*(extract('hour' from v_info.rollup_time))+floor(extract('minute' from v_info.rollup_time)/5) );
78             --v_stored_rollup := v_info.rollup_time::date;
79             -- RAISE NOTICE 'sid %, name %, rollup_time %, offset %', v_rec.sid, v_rec.name, v_stored_rollup, v_offset;
80
81             v_sql := 'SELECT * FROM metric_numeric_rollup_'||in_roll||' WHERE rollup_time = '||quote_literal(v_stored_rollup);
82             v_sql := v_sql ||' and sid='||v_rec.sid||' and name = '|| quote_literal(v_rec.name);
83
84             EXECUTE v_sql INTO v_segment;
85             IF v_segment IS NOT NULL THEN
86                 v_segment := stratcon.init_metric_numeric_rollup_segment( in_roll );
87                 v_init := true;
88                 RAISE NOTICE 'didnt find sid %, name %, rollup_time %, offset %', v_rec.sid, v_rec.name, v_stored_rollup, v_offset;
89             END IF;
90
91             v_segment.sid                   := v_rec.sid;
92             v_segment.name                  := v_rec.name;
93             v_segment.count_rows[v_offset]  := v_rec.count_rows;
94             v_segment.avg_value[v_offset]   := v_rec.avg_value;
95             v_segment.counter_dev[v_offset] := v_rec.counter_dev;
96
97             IF v_init THEN
98                 v_sql := 'INSERT INTO metric_numeric_rollup_'||in_roll||' (sid,name,rollup_time,count_rows,avg_value,counter_dev)
99                     VALUES ('|| v_segment.sid||','||quote_literal(v_segment.name)||','||quote_literal(v_stored_rollup)||','||v_segment.count_rows
100                     ||','||v_segment.avg_value||','||v_segment.counter_dev||')';
101                 EXECUTE v_sql;
102                 v_init := false;
103             ELSE
104                 v_sql := 'UPDATE metric_numeric_rollup_'||in_roll;
105                 v_sql := v_sql || 'SET (count_rows,avg_value,counter_dev) = ('||v_rec.count_rows||','||v_rec.avg_value||','||v_rec.counter_dev||')';
106                 v_sql := v_sql || 'WHERE rollup_time = '||v_stored_rollup||'  AND sid = '||v_info.sid||'  AND name = '||quote_literal(v_info.name);
107             END IF;
108
109         v_i := v_i + 1;
110         END LOOP;
111
112         -- Delete from whence log table
113
114         DELETE FROM metric_numeric_rollup_queue WHERE WHENCE=v_min_whence AND INTERVAL=in_roll;
115
116         v_min_whence := NULL;
117         v_max_rollup := NULL;
118
119     END LOOP;
120
121     perform pg_advisory_unlock(43191, v_taskid);
122
123     RETURN 0;
124
125 EXCEPTION
126     WHEN RAISE_EXCEPTION THEN
127        perform pg_advisory_unlock(43191, v_taskid);
128        RAISE EXCEPTION '%', SQLERRM;
129     WHEN OTHERS THEN
130        perform pg_advisory_unlock(43191, v_taskid);
131        RAISE NOTICE '%', SQLERRM;
132 END
133 $$ LANGUAGE plpgsql;
Note: See TracBrowser for help on using the browser.