root/sql/sprocs/stratcon.rollup_metric_numeric.sql

Revision 51b2a89650aa59a060f72e44e0936e37650c04d6, 7.1 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

Cherry pick two fixed from whizz

[4d04680929d4d03879c7a9d30830f6eeea39a338]
Add function to drop old metrics data

[6a78a0b46f8ca626ab9978b5a6ea7ef031a4bfa3]
Cosmetics, change few NOTICEs to DEBUGs

  • Property mode set to 100644
Line 
1 CREATE OR REPLACE FUNCTION stratcon.rollup_metric_numeric(in_roll text) RETURNS int AS $$
2 DECLARE
3     v_rec           stratcon.metric_numeric_rollup_segment%rowtype;
4     v_segment       stratcon.metric_numeric_rollup%rowtype;
5     v_conf          RECORD;
6     v_sql           TEXT;
7     v_current_whence TIMESTAMPTZ;
8     v_min_whence    TIMESTAMPTZ;
9     v_whence        TIMESTAMPTZ;
10     v_taskid        INTEGER;
11     v_locked        BOOLEAN;
12     v_this_roll     TEXT;
13     v_stored_rollup INTEGER;
14     v_stored_rollup_tm TIMESTAMPTZ;
15     v_offset        INTEGER;
16     v_init          BOOLEAN := FALSE;
17     v_i             SMALLINT := 0;
18     v_temprec       RECORD;
19     v_count         INTEGER;
20     v_max_segs      INTEGER := 12; -- maximum number of segments to process in one go
21 BEGIN
22
23     -- Get rollup config based on given name, and fail if its wrong name.
24     SELECT * FROM metric_numeric_rollup_config WHERE rollup = in_roll INTO v_conf;
25     IF NOT FOUND THEN
26         RAISE EXCEPTION 'Given rollup name is invalid! [%]', in_roll;
27     END IF;
28
29     -- Get task id - used for locking - based on given roll name
30     v_this_roll := 'rollup_metric_numeric_'||in_roll;
31     SELECT id FROM tasklock WHERE "name" = v_this_roll INTO v_taskid;
32     IF v_taskid IS NULL THEN
33         INSERT INTO tasklock (id, "name") VALUES (nextval('tasklock_id_seq'), v_this_roll) RETURNING id into v_taskid;
34     END IF;
35
36     -- Try to lock task_id - to make sure only one stratcon.rollup_metric_numeric_generic() runs at a time for this particular in_roll.
37     SELECT pg_try_advisory_lock(43191, v_taskid) INTO v_locked;
38     IF v_locked = false THEN
39         RAISE NOTICE 'rollup for metric numeric (%) already running', in_roll;
40         RETURN 0;
41     END IF;
42
43     v_current_whence := 'epoch'::timestamptz + '1 second'::INTERVAL * v_conf.seconds * floor(extract( epoch FROM now() ) / v_conf.seconds);
44
45     LOOP
46         IF v_i > v_max_segs THEN
47             PERFORM pg_advisory_unlock(43191, v_taskid);
48             RAISE NOTICE 'processed % segments, exiting', v_i - 1;
49             RETURN 1;
50         END IF;
51
52         SELECT MIN(whence) FROM metric_numeric_rollup_queue
53          WHERE "interval" = in_roll and whence < v_current_whence INTO v_min_whence;
54         EXIT WHEN NOT FOUND OR v_min_whence IS NULL;
55
56         -- now() in following query is just a placeholder to get named field (use_whence) in temprec.
57         FOR v_temprec IN SELECT *, v_min_whence as use_whence
58                            FROM noit.metric_numeric_rollup_config WHERE dependent_on = in_roll
59         LOOP
60             -- Following formula gives equivalent of date_trunc(..) but working on basically any unit - like "10 minutes"
61             -- The unit has to be given in seconds, AND provided as v_temprec.seconds
62             v_temprec.use_whence := 'epoch'::timestamptz + '1 second'::INTERVAL * v_temprec.seconds * floor(extract( epoch FROM v_temprec.use_whence ) / v_temprec.seconds);
63
64             RAISE NOTICE 'queueing for rollup: interval = %, whence = %',v_temprec.rollup, v_temprec.use_whence; 
65             -- Poor mans UPSERT :)
66             INSERT INTO metric_numeric_rollup_queue ("interval", whence)
67                 SELECT v_temprec.rollup, v_temprec.use_whence
68                 WHERE NOT EXISTS (
69                     SELECT * FROM metric_numeric_rollup_queue WHERE ( "interval", whence ) = ( v_temprec.rollup, v_temprec.use_whence )
70                 );
71         END LOOP;
72
73         RAISE NOTICE 'processing: interval = %, whence = %', in_roll, v_min_whence;
74         IF in_roll = '5m' THEN
75             v_sql := 'SELECT * FROM stratcon.window_robust_derive('||quote_literal(v_min_whence)||')';
76         ELSE
77             v_sql := 'SELECT sid, name, '||quote_literal(v_min_whence)||' as rollup_time, ';
78             v_sql := v_sql || ' SUM(coalesce(count_rows, 0)) as count_rows, ';
79             v_sql := v_sql || ' (SUM(avg_value*coalesce(count_rows,0))/SUM(coalesce(count_rows, 0))) as avg_value,';
80             v_sql := v_sql || ' (SUM(counter_dev*coalesce(count_rows,0))/SUM(coalesce(count_rows, 0))) as counter_dev ';
81             v_sql := v_sql || ' FROM stratcon.unroll_metric_numeric('||quote_literal(v_min_whence)||',';
82             v_sql := v_sql || quote_literal(v_min_whence + (v_conf.seconds - 1) * '1 second'::interval) || ',' || quote_literal(v_conf.dependent_on) ||')';
83             v_sql := v_sql || ' GROUP BY sid, name'; 
84         END IF;
85         RAISE DEBUG 'v_sql was (%)',v_sql;
86
87         FOR v_rec IN EXECUTE v_sql LOOP
88             v_stored_rollup := floor( extract('epoch' from v_rec.rollup_time) / v_conf.span ) * v_conf.span;
89             v_stored_rollup_tm := 'epoch'::timestamptz + v_stored_rollup * '1 second'::interval;
90             v_offset        := floor( ( extract('epoch' from v_rec.rollup_time) - v_stored_rollup) / v_conf.seconds );
91
92             v_sql := 'SELECT * FROM metric_numeric_rollup_'||in_roll||' WHERE rollup_time = '||quote_literal(v_stored_rollup_tm);
93             v_sql := v_sql ||' and sid='||v_rec.sid||' and name = '|| quote_literal(v_rec.name);
94
95             EXECUTE v_sql INTO v_segment;
96             GET DIAGNOSTICS v_count = ROW_COUNT;
97             IF v_count = 0 THEN
98                 v_segment := stratcon.init_metric_numeric_rollup( in_roll );
99                 v_init := true;
100                 RAISE DEBUG 'didn''t find, inserting: sid = %, name = %, rollup_time = %, offset = %', v_rec.sid, v_rec.name, v_stored_rollup_tm, v_offset;
101             END IF;
102
103             v_segment.sid                   := v_rec.sid;
104             v_segment.name                  := v_rec.name;
105             v_segment.count_rows[v_offset]  := v_rec.count_rows;
106             v_segment.avg_value[v_offset]   := v_rec.avg_value;
107             v_segment.counter_dev[v_offset] := v_rec.counter_dev;
108
109             IF v_init THEN
110                 v_sql := 'INSERT INTO metric_numeric_rollup_'||in_roll||' (sid,name,rollup_time,count_rows,avg_value,counter_dev)
111                     VALUES ($1,$2,$3,$4,$5,$6)';
112                 EXECUTE v_sql USING v_segment.sid, v_segment.name, v_stored_rollup_tm, v_segment.count_rows, v_segment.avg_value, v_segment.counter_dev;
113                 v_init := false;
114             ELSE
115                 v_sql := 'UPDATE metric_numeric_rollup_'||in_roll;
116                 v_sql := v_sql || ' SET (count_rows,avg_value,counter_dev) = ($1,$2,$3)';
117                 v_sql := v_sql || ' WHERE rollup_time = $4  AND sid = $5 AND name = $6';
118                 EXECUTE v_sql USING v_segment.count_rows, v_segment.avg_value, v_segment.counter_dev, v_stored_rollup_tm, v_segment.sid, v_segment.name;
119             END IF;
120
121         END LOOP;
122
123         v_i := v_i + 1;
124
125         -- Delete from whence log table
126         DELETE FROM metric_numeric_rollup_queue WHERE whence=v_min_whence AND "interval"=in_roll;
127         RAISE NOTICE 'done, removed from queue: interval = %, whence = %', in_roll, v_min_whence;
128        
129         v_min_whence := NULL;
130     END LOOP;
131
132     perform pg_advisory_unlock(43191, v_taskid);
133
134     RETURN 0;
135
136 EXCEPTION
137     WHEN RAISE_EXCEPTION THEN
138        perform pg_advisory_unlock(43191, v_taskid);
139        RAISE EXCEPTION '%', SQLERRM;
140     WHEN OTHERS THEN
141        perform pg_advisory_unlock(43191, v_taskid);
142        RAISE NOTICE '%', SQLERRM;
143 END
144 $$ LANGUAGE plpgsql
145 SECURITY DEFINER
146 ;
Note: See TracBrowser for help on using the browser.