| 1 | | CREATE OR REPLACE FUNCTION stratcon.rollup_metric_numeric_generic(in_roll text) |
|---|
| 2 | | RETURNS int AS |
|---|
| 3 | | $$ |
|---|
| 4 | | DECLARE |
|---|
| 5 | | v_rec stratcon.metric_numeric_rollup_segment%rowtype; |
|---|
| 6 | | v_segment stratcon.metric_numeric_rollup_segment%rowtype; |
|---|
| 7 | | v_conf RECORD; |
|---|
| 8 | | v_sql TEXT; |
|---|
| 9 | | v_min_whence TIMESTAMPTZ; |
|---|
| 10 | | v_max_rollup TIMESTAMPTZ; |
|---|
| 11 | | v_whence TIMESTAMPTZ; |
|---|
| 12 | | whenceint RECORD; |
|---|
| 13 | | v_taskid int; |
|---|
| 14 | | v_locked boolean; |
|---|
| 15 | | v_this_roll text; |
|---|
| 16 | | v_stored_rollup timestamptz; |
|---|
| 17 | | v_offset integer; |
|---|
| 18 | | v_init boolean := FALSE ; |
|---|
| 19 | | v_i smallint; |
|---|
| | 1 | CREATE OR REPLACE FUNCTION stratcon.rollup_metric_numeric_generic(in_roll text) RETURNS int AS $$ |
|---|
| | 2 | DECLARE |
|---|
| | 3 | v_rec stratcon.metric_numeric_rollup_segment%rowtype; |
|---|
| | 4 | v_segment stratcon.metric_numeric_rollup_segment%rowtype; |
|---|
| | 5 | v_conf RECORD; |
|---|
| | 6 | v_sql TEXT; |
|---|
| | 7 | v_min_whence TIMESTAMPTZ; |
|---|
| | 8 | v_max_rollup TIMESTAMPTZ; |
|---|
| | 9 | v_whence TIMESTAMPTZ; |
|---|
| | 10 | v_taskid INT; |
|---|
| | 11 | v_locked BOOLEAN; |
|---|
| | 12 | v_this_roll TEXT; |
|---|
| | 13 | v_stored_rollup TIMESTAMPTZ; |
|---|
| | 14 | v_offset INTEGER; |
|---|
| | 15 | v_init BOOLEAN := FALSE; |
|---|
| | 16 | v_i SMALLINT; |
|---|
| | 17 | v_temprec RECORD; |
|---|
| 21 | | |
|---|
| 22 | | v_this_roll := 'rollup_metric_numeric_'||in_roll; |
|---|
| 23 | | SELECT id FROM tasklock WHERE name = v_this_roll INTO v_taskid; |
|---|
| 24 | | IF v_taskid IS NULL THEN |
|---|
| 25 | | INSERT INTO tasklock (id, name) VALUES (nextval('tasklock_id_seq'), v_this_roll) |
|---|
| 26 | | RETURNING id into v_taskid; |
|---|
| 27 | | END IF; |
|---|
| 28 | | |
|---|
| 29 | | SELECT pg_try_advisory_lock(43191, v_taskid) INTO v_locked; |
|---|
| 30 | | |
|---|
| 31 | | IF v_locked = false THEN |
|---|
| 32 | | RAISE NOTICE 'rollup for metric numeric (%) already running', in_roll; |
|---|
| 33 | | RETURN 0; |
|---|
| 34 | | END IF; |
|---|
| 35 | | |
|---|
| 36 | | SELECT * FROM stratcon.metric_numeric_rollup_config WHERE rollup = in_roll INTO v_conf; |
|---|
| 37 | | |
|---|
| 38 | | LOOP |
|---|
| 39 | | IF v_i > 10 THEN |
|---|
| 40 | | RETURN 1; |
|---|
| 41 | | END IF; |
|---|
| 42 | | |
|---|
| 43 | | v_sql := 'SELECT MIN(whence) FROM metric_numeric_rollup_queue WHERE interval='||in_roll; |
|---|
| 44 | | EXECUTE v_sql INTO v_min_whence; |
|---|
| 45 | | |
|---|
| 46 | | v_sql := 'SELECT MAX(rollup_time) FROM numeric_metric_rollup_'||in_roll; |
|---|
| 47 | | EXECUTE v_sql INTO v_max_rollup; |
|---|
| 48 | | |
|---|
| 49 | | /* |
|---|
| 50 | | -- Insert Log for Hourly rollup |
|---|
| 51 | | |
|---|
| 52 | | SELECT whence FROM stratcon.log_whence_s WHERE whence=date_trunc('H',v_min_whence) and interval='1 hour' |
|---|
| 53 | | INTO v_whence; |
|---|
| 54 | | IF NOT FOUND THEN |
|---|
| 55 | | INSERT INTO stratcon.log_whence_s VALUES(date_trunc('H',v_min_whence),'1 hour'); |
|---|
| 56 | | END IF; |
|---|
| 57 | | */ |
|---|
| 58 | | |
|---|
| 59 | | IF v_min_whence <= v_max_rollup THEN |
|---|
| 60 | | v_sql := 'DELETE FROM numeric_metric_rollup_'||in_roll||' WHERE rollup_time = '||quote_ident(v_min_whence); |
|---|
| 61 | | EXECUTE v_sql; |
|---|
| 62 | | |
|---|
| | 19 | -- Get rollup config based on given name, and fail if its wrong name. |
|---|
| | 20 | SELECT * FROM stratcon.metric_numeric_rollup_config WHERE rollup = in_roll INTO v_conf; |
|---|
| | 21 | IF NOT FOUND THEN |
|---|
| | 22 | raise exception 'Given rollup name is invalid! [%]', in_roll; |
|---|
| 65 | | /* THIS V_SQL NEEDS TO BE REWRITTEN TO GET THE VALUE FROM ARRAY BASED TABLES */ |
|---|
| 66 | | v_sql := 'SELECT sid, name, '|| v_min_whence || ' as rollup_time, |
|---|
| 67 | | SUM(1) as count_rows ,(SUM(avg_value*1)/SUM(1)) as avg_value, |
|---|
| 68 | | (SUM(counter_dev*1)/SUM(1)) as counter_dev |
|---|
| 69 | | FROM metric_numeric_rollup_'||v_conf.dependent_on||' |
|---|
| 70 | | WHERE rollup_time<= '|| v_min_whence ||' AND rollup_time > ' || v_min_whence - v_conf.seconds * '1 second'::interval || ' |
|---|
| 71 | | GROUP BY sid, name'; |
|---|
| | 25 | -- Get task id - used for locking - based on given roll name |
|---|
| | 26 | v_this_roll := 'rollup_metric_numeric_'||in_roll; |
|---|
| | 27 | SELECT id FROM tasklock WHERE "name" = v_this_roll INTO v_taskid; |
|---|
| | 28 | IF v_taskid IS NULL THEN |
|---|
| | 29 | INSERT INTO tasklock (id, "name") VALUES (nextval('tasklock_id_seq'), v_this_roll) RETURNING id into v_taskid; |
|---|
| | 30 | END IF; |
|---|
| 73 | | FOR v_rec IN EXECUTE v_sql LOOP |
|---|
| 74 | | v_stored_rollup := floor(extract('epoch' from v_rec.rollup_time)/v_conf.span)+vconf.window; |
|---|
| 75 | | v_offset := floor( (extract('epoch' from v_rec.rollup_time) - v_stored_rollup) / v_conf.seconds ); |
|---|
| 76 | | |
|---|
| | 32 | -- Try to lock task_id - to make sure only one stratcon.rollup_metric_numeric_generic() runs at a time for this particular in_roll. |
|---|
| | 33 | SELECT pg_try_advisory_lock(43191, v_taskid) INTO v_locked; |
|---|
| | 34 | IF v_locked = false THEN |
|---|
| | 35 | RAISE NOTICE 'rollup for metric numeric (%) already running', in_roll; |
|---|
| | 36 | RETURN 0; |
|---|
| | 37 | END IF; |
|---|
| | 38 | |
|---|
| | 39 | LOOP |
|---|
| | 40 | IF v_i > 10 THEN |
|---|
| | 41 | RETURN 1; |
|---|
| | 42 | END IF; |
|---|
| | 43 | |
|---|
| | 44 | SELECT MIN(whence) FROM metric_numeric_rollup_queue WHERE "interval" = in_roll; |
|---|
| | 45 | EXIT WHEN NOT FOUND; |
|---|
| | 46 | |
|---|
| | 47 | v_sql := 'SELECT MAX(rollup_time) FROM metric_numeric_rollup_' || in_roll; |
|---|
| | 48 | EXECUTE v_sql INTO v_max_rollup; |
|---|
| | 49 | |
|---|
| | 50 | IF v_min_whence <= v_max_rollup THEN |
|---|
| | 51 | v_sql := 'DELETE FROM metric_numeric_rollup_'||in_roll||' WHERE rollup_time = '||quote_literal(v_min_whence); |
|---|
| | 52 | EXECUTE v_sql; |
|---|
| | 53 | END IF; |
|---|
| | 54 | |
|---|
| | 55 | -- now() in following query is just a placeholder to get named field (use_whence) in temprec. |
|---|
| | 56 | FOR v_temprec IN SELECT *, now() as use_whence FROM noit.metric_numeric_rollup_config WHERE dependent_on = in_roll LOOP |
|---|
| | 57 | -- Following formula gives equivalent of date_trunc(..) but working on basically any unit - like "10 minutes" |
|---|
| | 58 | -- The unit has to be given in seconds, AND provided as v_temprec.seconds |
|---|
| | 59 | v_temprec.use_whence := 'epoch'::timestamptz + '1 second'::INTERVAL * v_temprec.seconds * floor(extract( epoch FROM now() ) / v_temprec.seconds); |
|---|
| | 60 | |
|---|
| | 61 | -- Poor mans UPSERT :) |
|---|
| | 62 | INSERT INTO metric_numeric_rollup_queue ("interval", whence) |
|---|
| | 63 | SELECT v_temprec.rollup, v_temprec.use_whence |
|---|
| | 64 | WHERE NOT EXISTS ( |
|---|
| | 65 | SELECT * FROM metric_numeric_rollup_queue WHERE ( "INTERVAL", whence ) = ( v_temprec.rollup, v_temprec.use_whence ) |
|---|
| | 66 | ); |
|---|
| | 67 | END LOOP; |
|---|
| | 68 | |
|---|
| | 69 | v_sql := 'SELECT sid, name, $2 as rollup_time, SUM(1) as count_rows, (SUM(avg_value*1)/SUM(1)) as avg_value, (SUM(counter_dev*1)/SUM(1)) as counter_dev |
|---|
| | 70 | FROM stratcon.unroll_metric_numeric( $2, $1, $3) |
|---|
| | 71 | GROUP BY sid, name'; |
|---|
| | 72 | |
|---|
| | 73 | FOR v_rec IN EXECUTE v_sql USING v_min_whence - v_conf.seconds * '1 second'::INTERVAL, v_min_whence, v_conf.dependent_on LOOP |
|---|
| | 74 | v_stored_rollup := floor( extract('epoch' from v_rec.rollup_time) / v_conf.span ) + v_conf.window; |
|---|
| | 75 | v_offset := floor( ( extract('epoch' from v_rec.rollup_time) - v_stored_rollup) / v_conf.seconds ); |
|---|
| | 76 | |
|---|
| 86 | | v_sql := 'SELECT * FROM stratcon.init_metric_numeric_rollup_segment('||quote_literal(in_roll)||')'; |
|---|
| 87 | | EXECUTE v_sql INTO v_segment; |
|---|
| 88 | | v_init := true; |
|---|
| 89 | | RAISE NOTICE 'didnt find sid %, name %, rollup_time %, offset %', v_rec.sid, v_rec.name, v_stored_rollup, v_offset; |
|---|
| | 86 | v_segment := stratcon.init_metric_numeric_rollup_segment( in_roll ); |
|---|
| | 87 | v_init := true; |
|---|
| | 88 | RAISE NOTICE 'didnt find sid %, name %, rollup_time %, offset %', v_rec.sid, v_rec.name, v_stored_rollup, v_offset; |
|---|