Changeset fa6feca15943c95436cb86bd016aee5944928a27

Show
Ignore:
Timestamp:
10/23/09 18:42:47 (4 years ago)
Author:
Depesz Lubaczewski <depesz@omniti.com>
git-committer:
Depesz Lubaczewski <depesz@omniti.com> 1256323367 +0000
git-parent:

[31d5dd7ae5a1a551b1d6b995ac08588191019405]

git-author:
Depesz Lubaczewski <depesz@omniti.com> 1256323367 +0000
Message:

initial commit of reviewed function, with added handling of triggering upper-level rollups, and reading from unroll function

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • sql/sprocs/rollup_metric_numeric_generic.sql

    rf0c2368 rfa6feca  
    1 CREATE OR REPLACE FUNCTION stratcon.rollup_metric_numeric_generic(in_roll text) 
    2 RETURNS int AS 
    3 $$ 
    4 DECLARE   
    5  v_rec stratcon.metric_numeric_rollup_segment%rowtype; 
    6  v_segment stratcon.metric_numeric_rollup_segment%rowtype; 
    7  v_conf RECORD;  
    8  v_sql TEXT; 
    9  v_min_whence TIMESTAMPTZ; 
    10  v_max_rollup TIMESTAMPTZ; 
    11  v_whence TIMESTAMPTZ; 
    12  whenceint RECORD; 
    13  v_taskid int; 
    14  v_locked boolean; 
    15  v_this_roll text;  
    16  v_stored_rollup timestamptz;  
    17  v_offset integer;  
    18  v_init boolean := FALSE ;  
    19  v_i smallint; 
     1CREATE OR REPLACE FUNCTION stratcon.rollup_metric_numeric_generic(in_roll text) RETURNS int AS $$ 
     2DECLARE 
     3    v_rec           stratcon.metric_numeric_rollup_segment%rowtype; 
     4    v_segment       stratcon.metric_numeric_rollup_segment%rowtype; 
     5    v_conf          RECORD; 
     6    v_sql           TEXT; 
     7    v_min_whence    TIMESTAMPTZ; 
     8    v_max_rollup    TIMESTAMPTZ; 
     9    v_whence        TIMESTAMPTZ; 
     10    v_taskid        INT; 
     11    v_locked        BOOLEAN; 
     12    v_this_roll     TEXT; 
     13    v_stored_rollup TIMESTAMPTZ; 
     14    v_offset        INTEGER; 
     15    v_init          BOOLEAN := FALSE; 
     16    v_i             SMALLINT; 
     17    v_temprec       RECORD; 
    2018BEGIN 
    21  
    22 v_this_roll := 'rollup_metric_numeric_'||in_roll; 
    23 SELECT id FROM tasklock WHERE name = v_this_roll INTO v_taskid; 
    24 IF v_taskid IS NULL THEN 
    25     INSERT INTO tasklock (id, name) VALUES (nextval('tasklock_id_seq'), v_this_roll) 
    26       RETURNING id into v_taskid; 
    27 END IF; 
    28  
    29 SELECT pg_try_advisory_lock(43191, v_taskid) INTO v_locked; 
    30  
    31 IF v_locked = false THEN 
    32     RAISE NOTICE 'rollup for metric numeric (%) already running', in_roll; 
    33     RETURN 0; 
    34 END IF; 
    35  
    36 SELECT * FROM stratcon.metric_numeric_rollup_config WHERE rollup = in_roll INTO v_conf;  
    37  
    38 LOOP 
    39     IF v_i > 10 THEN 
    40         RETURN 1; 
    41     END IF;  
    42  
    43     v_sql := 'SELECT MIN(whence) FROM metric_numeric_rollup_queue WHERE interval='||in_roll;  
    44     EXECUTE v_sql INTO v_min_whence; 
    45          
    46     v_sql := 'SELECT MAX(rollup_time) FROM numeric_metric_rollup_'||in_roll; 
    47     EXECUTE v_sql INTO v_max_rollup;         
    48  
    49 /*  
    50  -- Insert Log for Hourly rollup 
    51     
    52    SELECT whence FROM stratcon.log_whence_s WHERE whence=date_trunc('H',v_min_whence) and interval='1 hour' 
    53            INTO v_whence; 
    54       IF NOT FOUND THEN 
    55        INSERT INTO  stratcon.log_whence_s VALUES(date_trunc('H',v_min_whence),'1 hour'); 
    56       END IF; 
    57 */ 
    58   
    59     IF v_min_whence <= v_max_rollup THEN 
    60         v_sql := 'DELETE FROM numeric_metric_rollup_'||in_roll||' WHERE rollup_time = '||quote_ident(v_min_whence);   
    61         EXECUTE v_sql;  
    62   
     19    -- Get rollup config based on given name, and fail if its wrong name. 
     20    SELECT * FROM stratcon.metric_numeric_rollup_config WHERE rollup = in_roll INTO v_conf; 
     21    IF NOT FOUND THEN 
     22        raise exception 'Given rollup name is invalid! [%]', in_roll; 
    6323    END IF; 
    6424 
    65 /* THIS V_SQL NEEDS TO BE REWRITTEN TO GET THE VALUE FROM ARRAY BASED TABLES */  
    66     v_sql := 'SELECT sid, name, '|| v_min_whence || ' as rollup_time,  
    67                      SUM(1) as count_rows ,(SUM(avg_value*1)/SUM(1)) as avg_value,  
    68                      (SUM(counter_dev*1)/SUM(1)) as counter_dev  
    69               FROM metric_numeric_rollup_'||v_conf.dependent_on||'  
    70               WHERE rollup_time<= '|| v_min_whence ||' AND rollup_time > ' || v_min_whence - v_conf.seconds * '1 second'::interval || ' 
    71               GROUP BY sid, name'; 
     25    -- Get task id - used for locking - based on given roll name 
     26    v_this_roll := 'rollup_metric_numeric_'||in_roll; 
     27    SELECT id FROM tasklock WHERE "name" = v_this_roll INTO v_taskid; 
     28    IF v_taskid IS NULL THEN 
     29        INSERT INTO tasklock (id, "name") VALUES (nextval('tasklock_id_seq'), v_this_roll) RETURNING id into v_taskid; 
     30    END IF; 
    7231 
    73     FOR v_rec IN EXECUTE v_sql LOOP 
    74             v_stored_rollup := floor(extract('epoch' from v_rec.rollup_time)/v_conf.span)+vconf.window;  
    75             v_offset := floor( (extract('epoch' from v_rec.rollup_time) - v_stored_rollup) / v_conf.seconds ); 
    76   
     32    -- Try to lock task_id - to make sure only one stratcon.rollup_metric_numeric_generic() runs at a time for this particular in_roll. 
     33    SELECT pg_try_advisory_lock(43191, v_taskid) INTO v_locked; 
     34    IF v_locked = false THEN 
     35        RAISE NOTICE 'rollup for metric numeric (%) already running', in_roll; 
     36        RETURN 0; 
     37    END IF; 
     38 
     39    LOOP 
     40        IF v_i > 10 THEN 
     41            RETURN 1; 
     42        END IF; 
     43 
     44        SELECT MIN(whence) FROM metric_numeric_rollup_queue WHERE "interval" = in_roll; 
     45        EXIT WHEN NOT FOUND; 
     46 
     47        v_sql := 'SELECT MAX(rollup_time) FROM metric_numeric_rollup_' || in_roll; 
     48        EXECUTE v_sql INTO v_max_rollup; 
     49 
     50        IF v_min_whence <= v_max_rollup THEN 
     51            v_sql := 'DELETE FROM metric_numeric_rollup_'||in_roll||' WHERE rollup_time = '||quote_literal(v_min_whence); 
     52            EXECUTE v_sql; 
     53        END IF; 
     54 
     55        -- now() in following query is just a placeholder to get named field (use_whence) in temprec. 
     56        FOR v_temprec IN SELECT *, now() as use_whence FROM noit.metric_numeric_rollup_config WHERE dependent_on = in_roll LOOP 
     57            -- Following formula gives equivalent of date_trunc(..) but working on basically any unit - like "10 minutes" 
     58            -- The unit has to be given in seconds, AND provided as v_temprec.seconds 
     59            v_temprec.use_whence := 'epoch'::timestamptz + '1 second'::INTERVAL * v_temprec.seconds * floor(extract( epoch FROM now() ) / v_temprec.seconds); 
     60 
     61            -- Poor mans UPSERT :) 
     62            INSERT INTO metric_numeric_rollup_queue ("interval", whence) 
     63                SELECT v_temprec.rollup, v_temprec.use_whence 
     64                WHERE NOT EXISTS ( 
     65                    SELECT * FROM metric_numeric_rollup_queue WHERE ( "INTERVAL", whence ) = ( v_temprec.rollup, v_temprec.use_whence ) 
     66                ); 
     67        END LOOP; 
     68 
     69        v_sql := 'SELECT sid, name, $2 as rollup_time, SUM(1) as count_rows, (SUM(avg_value*1)/SUM(1)) as avg_value, (SUM(counter_dev*1)/SUM(1)) as counter_dev 
     70                  FROM  stratcon.unroll_metric_numeric( $2, $1, $3) 
     71                  GROUP BY sid, name'; 
     72 
     73        FOR v_rec IN EXECUTE v_sql USING v_min_whence - v_conf.seconds * '1 second'::INTERVAL, v_min_whence, v_conf.dependent_on LOOP 
     74            v_stored_rollup := floor( extract('epoch' from v_rec.rollup_time) / v_conf.span ) + v_conf.window; 
     75            v_offset        := floor( ( extract('epoch' from v_rec.rollup_time) - v_stored_rollup) / v_conf.seconds ); 
     76 
    7777            --v_offset := ( 12*(extract('hour' from v_info.rollup_time))+floor(extract('minute' from v_info.rollup_time)/5) ); 
    7878            --v_stored_rollup := v_info.rollup_time::date; 
     
    8484            EXECUTE v_sql INTO v_segment; 
    8585            IF v_segment IS NOT NULL THEN 
    86                 v_sql := 'SELECT * FROM stratcon.init_metric_numeric_rollup_segment('||quote_literal(in_roll)||')'; 
    87                 EXECUTE v_sql INTO v_segment;  
    88                 v_init := true;  
    89                 RAISE NOTICE 'didnt find sid %, name %, rollup_time %, offset %', v_rec.sid, v_rec.name, v_stored_rollup, v_offset;  
     86                v_segment := stratcon.init_metric_numeric_rollup_segment( in_roll ); 
     87                v_init := true; 
     88                RAISE NOTICE 'didnt find sid %, name %, rollup_time %, offset %', v_rec.sid, v_rec.name, v_stored_rollup, v_offset; 
    9089            END IF; 
    91     
    92             v_segment.sid := v_rec.sid; 
    93             v_segment.name := v_rec.name; 
    94             v_segment.count_rows[v_offset] := v_rec.count_rows; 
    95             v_segment.avg_value[v_offset] := v_rec.avg_value; 
     90 
     91            v_segment.sid                   := v_rec.sid; 
     92            v_segment.name                  := v_rec.name; 
     93            v_segment.count_rows[v_offset] := v_rec.count_rows; 
     94            v_segment.avg_value[v_offset]   := v_rec.avg_value; 
    9695            v_segment.counter_dev[v_offset] := v_rec.counter_dev; 
    9796 
    9897            IF v_init THEN 
    99                 v_sql := 'INSERT INTO metric_numeric_rollup_'||in_roll||' (sid,name,rollup_time,count_rows,avg_value,counter_dev)  
     98                v_sql := 'INSERT INTO metric_numeric_rollup_'||in_roll||' (sid,name,rollup_time,count_rows,avg_value,counter_dev) 
    10099                    VALUES ('|| v_segment.sid||','||quote_literal(v_segment.name)||','||quote_literal(v_stored_rollup)||','||v_segment.count_rows 
    101100                    ||','||v_segment.avg_value||','||v_segment.counter_dev||')'; 
    102                 EXECUTE v_sql;  
     101                EXECUTE v_sql; 
    103102                v_init := false; 
    104103            ELSE 
    105104                v_sql := 'UPDATE metric_numeric_rollup_'||in_roll; 
    106                 v_sql := v_sql || 'SET (count_rows,avg_value,counter_dev) = ('||v_rec.count_rows||','||v_rec.avg_value||','||v_rec.counter_dev||')';  
     105                v_sql := v_sql || 'SET (count_rows,avg_value,counter_dev) = ('||v_rec.count_rows||','||v_rec.avg_value||','||v_rec.counter_dev||')'; 
    107106                v_sql := v_sql || 'WHERE rollup_time = '||v_stored_rollup||'  AND sid = '||v_info.sid||'  AND name = '||quote_literal(v_info.name); 
    108107            END IF; 
    109108 
    110     v_i := v_i + 1;  
    111     END LOOP;  
     109        v_i := v_i + 1; 
     110        END LOOP; 
    112111 
    113   -- Delete from whence log table 
    114    
    115   DELETE FROM metric_numeric_rollup_queue WHERE WHENCE=v_min_whence AND INTERVAL=in_roll; 
    116   
    117   v_min_whence := NULL; 
    118   v_max_rollup := NULL; 
     112        -- Delete from whence log table 
    119113 
    120  END LOOP
     114        DELETE FROM metric_numeric_rollup_queue WHERE WHENCE=v_min_whence AND INTERVAL=in_roll
    121115 
    122   perform pg_advisory_unlock(43191, v_taskid); 
     116        v_min_whence := NULL; 
     117        v_max_rollup := NULL; 
    123118 
    124   RETURN 0; 
     119    END LOOP; 
     120 
     121    perform pg_advisory_unlock(43191, v_taskid); 
     122 
     123    RETURN 0; 
    125124 
    126125EXCEPTION 
     
    132131       RAISE NOTICE '%', SQLERRM; 
    133132END 
    134 $$ LANGUAGE plpgsql;  
     133$$ LANGUAGE plpgsql;