root/trunk/contrib/pg_scoreboard/pg_scoreboard.c

Revision 54, 15.2 kB (checked in by jesus, 4 years ago)

pull this in and give it to the world

Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * Copyright (c) 2007, Message Systems, Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  *
33  * Author: Theo Schlossnagle
34  *
35  */
36
37 #include <time.h>
38 #include <sys/time.h>
39
40 #include "postgres.h"
41 #include "funcapi.h"
42 #include "fmgr.h"
43 #include "miscadmin.h"
44 #include "pgstat.h"
45 #include "storage/lwlock.h"
46 #include "storage/shmem.h"
47 #include "storage/ipc.h"
48
49 #ifdef PG_MODULE_MAGIC
50 PG_MODULE_MAGIC;
51 #endif
52
53 #define HASH_BUCKETS 128
54 #define MAX_SLOTS    1024
55
56 #ifndef MIN
57 #define MIN(x, y)               ((x) < (y) ? (x) : (y))
58 #endif
59 #ifndef MAX
60 #define MAX(x, y)               ((x) > (y) ? (x) : (y))
61 #endif
62
63 void _PG_init(void);
64 Datum process_register(PG_FUNCTION_ARGS);
65 Datum process_deregister(PG_FUNCTION_ARGS);
66 Datum process_status(PG_FUNCTION_ARGS);
67 Datum process_scoreboard(PG_FUNCTION_ARGS);
68 static void setup_pg_scoreboard();
69
70 static int registered_onexit = 0;
71
72 struct psr_ll {
73   short recordid;
74   short next_nodesetid;
75 };
76
77 #define PG_SCOREBOARD_RECORD_LEN 512
78 #define PG_SCOREBOARD_MAX_CLIENT_ADDR_LEN 15
79 #define PG_SCOREBOARD_MAX_DESCRIPTION 63
80 #define PG_SCOREBOARD_MAX_STATUS_MESSAGE_LEN \
81         (PG_SCOREBOARD_RECORD_LEN - \
82          sizeof(struct timeval) - sizeof(struct timeval) - \
83          sizeof(unsigned int) - (PG_SCOREBOARD_MAX_CLIENT_ADDR_LEN + 1) - \
84          (PG_SCOREBOARD_MAX_DESCRIPTION + 1) - sizeof(unsigned short) - 1)
85
86 typedef struct {
87   unsigned int   procpid;
88 } pg_scoreboard_record_key;
89
90 typedef struct {
91   struct timeval create_time;
92   struct timeval last_update;
93   pg_scoreboard_record_key key;
94   char           client_addr[PG_SCOREBOARD_MAX_CLIENT_ADDR_LEN + 1];
95   unsigned short client_port;
96   char           client_description[PG_SCOREBOARD_MAX_DESCRIPTION + 1];
97   char           status_message[PG_SCOREBOARD_MAX_STATUS_MESSAGE_LEN + 1];
98 } pg_scoreboard_record;
99
100 typedef struct {
101   struct timeval last_update;
102   short next_recordid;
103 } pg_scoreboard_free_record;
104
105 /* How this works:
106  * We have a hash table with buckets that are chains of nodes (psr_ll).
107  * This hash is called nodehash.
108  * We're working in shared memory and we're tight on space, so everything
109  * is preallocated.  The link list of nodes is nodeset.  The records they
110  * reference are in recordset.
111  * As we need to "allocate" the nodes, we use the freelist_nodesetid.
112  * As we need to "allocate" the records, we use the freelist_recordsetid.
113  */
114
115 typedef struct {
116   LWLockId lockid;
117   short freelist_nodesetid;
118   short freelist_recordsetid;
119   short nodehash[HASH_BUCKETS];
120   struct psr_ll nodeset[MAX_SLOTS];
121   pg_scoreboard_record recordset[MAX_SLOTS];
122 } pg_scoreboard;
123
124 static pg_scoreboard *scoreboard = NULL;
125
126 #define NODEPTR(id) ((id < 0)?NULL:&scoreboard->nodeset[id])
127 #define RECORDPTR(id) ((id < 0)?NULL:&scoreboard->recordset[id])
128 #define FREERECORDPTR(id) ((pg_scoreboard_free_record *)((id < 0)?NULL:&scoreboard->recordset[id]))
129
130 /* These routines all require the called to have locked the scoreboard */
131 static short alloc_recordid() {
132   short recordid;
133   pg_scoreboard_free_record *record;
134
135   recordid = scoreboard->freelist_recordsetid;
136   record = FREERECORDPTR(recordid);
137   if(record) {
138     scoreboard->freelist_recordsetid = record->next_recordid;
139   }
140   return recordid;
141 }
142 static void free_recordid(short recordsetid) {
143   pg_scoreboard_free_record *record;
144   record = FREERECORDPTR(recordsetid);
145   record->next_recordid = scoreboard->freelist_recordsetid;
146   scoreboard->freelist_recordsetid = recordsetid;
147 }
148 static short alloc_nodeid() {
149   struct psr_ll *node;
150   short nodesetid;
151
152   nodesetid = scoreboard->freelist_nodesetid;
153   node = NODEPTR(nodesetid);
154   if(node) {
155     scoreboard->freelist_nodesetid = node->next_nodesetid;
156   }
157   return nodesetid;
158 }
159 static void free_nodeid(short nodesetid) {
160   struct psr_ll *node;
161   node = NODEPTR(nodesetid);
162   node->next_nodesetid = scoreboard->freelist_nodesetid;
163   scoreboard->freelist_nodesetid = nodesetid;
164 }
165
166 static void delete_record(int pid) {
167   pg_scoreboard_record *record;
168   short bucket;
169   short nodesetid;
170   short recordid;
171   struct psr_ll *node = NULL, *prev_node = NULL;
172   bucket = pid % HASH_BUCKETS;
173
174   for(nodesetid = scoreboard->nodehash[bucket], node = NODEPTR(nodesetid);
175       node;
176       nodesetid = node->next_nodesetid, node = NODEPTR(nodesetid)) {
177     record = RECORDPTR(node->recordid);
178     if(record->key.procpid == pid) break;
179     prev_node = node;
180   }
181   if(node) {
182     /* Match found */
183     if(prev_node)   /* detach it from the list */
184       prev_node->next_nodesetid = node->next_nodesetid;
185     else            /* or from the front of the list */
186       scoreboard->nodehash[bucket] = node->next_nodesetid;
187     recordid = node->recordid;
188     memset(record, 0, sizeof(*record));
189     free_recordid(recordid);
190     free_nodeid(nodesetid);
191   }
192 }
193 static pg_scoreboard_record *find_record(int pid, int create) {
194   pg_scoreboard_record *record;
195   short bucket;
196   short nodesetid;
197   short recordid;
198   struct psr_ll *node = NULL;
199   bucket = pid % HASH_BUCKETS;
200
201   for(node = NODEPTR(scoreboard->nodehash[bucket]);
202       node;
203       node = NODEPTR(node->next_nodesetid)) {
204     record = RECORDPTR(node->recordid);
205     if(record->key.procpid == pid) return record;
206   }
207
208   if(!create) return NULL;
209
210   recordid = alloc_recordid();
211   record = RECORDPTR(recordid);
212   if(!record) return NULL;
213   memset(record, 0, sizeof(*record));
214   record->key.procpid = pid;
215   nodesetid = alloc_nodeid();
216   node = NODEPTR(nodesetid);
217   if(!node) {
218     free_recordid(recordid);
219     return NULL;
220   }
221   node->recordid = recordid;
222   node->next_nodesetid = scoreboard->nodehash[bucket];
223   scoreboard->nodehash[bucket] = nodesetid;
224   gettimeofday(&record->create_time, NULL);
225   memcpy(&record->last_update, &record->create_time, sizeof(struct timeval));
226   return record;
227 }
228
229 static void exit_cb(int code, unsigned long unused) {
230   LWLockAcquire(scoreboard->lockid, LW_EXCLUSIVE);
231
232   delete_record(MyProcPid);
233  
234   LWLockRelease(scoreboard->lockid);
235 }
236
237 void _PG_init() {
238   RequestAddinShmemSpace(sizeof(*scoreboard));
239   RequestAddinLWLocks(1);
240 }
241
242 static void setup_pg_scoreboard() {
243   if (!scoreboard) {
244     int     i;
245     bool    found;
246
247     LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
248     scoreboard = ShmemInitStruct("pg_scoreboard", sizeof(*scoreboard), &found);
249
250     if (!scoreboard)
251       elog(ERROR, "out of shared memory");
252     if (!found) {
253       pg_scoreboard_free_record *freeset;
254
255       scoreboard->lockid = LWLockAssign();
256
257       for(i=0; i<HASH_BUCKETS; i++)    /* "null" out the hash bucket */
258         scoreboard->nodehash[i] = -1;
259
260       /* point at node 0
261        * then point each node
262        * at the next one and
263        * the last one to nowhere
264        */
265       scoreboard->freelist_nodesetid = 0;
266       for(i=0; i<MAX_SLOTS-1; i++)
267         scoreboard->nodeset[i].next_nodesetid = i+1;
268       scoreboard->nodeset[MAX_SLOTS-1].next_nodesetid = -1;
269
270       /* point at record 0
271        * then point each record
272        * at the next one and
273        * the last one to nowhere
274        */
275       scoreboard->freelist_recordsetid = 0;
276       memset(scoreboard->recordset, 0,
277              sizeof(*scoreboard->recordset) * MAX_SLOTS);
278       for(i=0; i<MAX_SLOTS-1; i++) {
279         freeset = (pg_scoreboard_free_record *)&scoreboard->recordset[i];
280         freeset->next_recordid = i+1;
281       }
282       ((pg_scoreboard_free_record *)&scoreboard->recordset[MAX_SLOTS-1])->next_recordid = -1;
283     }
284     LWLockRelease(AddinShmemInitLock);
285   }
286 }
287
288 PG_FUNCTION_INFO_V1(process_deregister);
289 Datum
290 process_deregister(PG_FUNCTION_ARGS) {
291   setup_pg_scoreboard();
292  
293   LWLockAcquire(scoreboard->lockid, LW_EXCLUSIVE);
294
295   delete_record(MyProcPid);
296  
297   LWLockRelease(scoreboard->lockid);
298
299   PG_RETURN_VOID();
300 }
301
302 PG_FUNCTION_INFO_V1(process_register);
303 Datum
304 process_register(PG_FUNCTION_ARGS) {
305   pg_scoreboard_record    *record;
306   text *client_addr = NULL;
307   int   client_addr_len;
308   int32 client_port = 0;
309   text *client_description = NULL;
310   int   client_description_len;
311  
312   setup_pg_scoreboard();
313
314   if(!registered_onexit) {
315     on_shmem_exit(exit_cb, 0);
316     registered_onexit = 1;
317   }
318  
319   LWLockAcquire(scoreboard->lockid, LW_EXCLUSIVE);
320
321   if(!PG_ARGISNULL(0)) client_addr = PG_GETARG_VARCHAR_P(0);
322   if(!PG_ARGISNULL(1)) client_port = PG_GETARG_INT32(1);
323   if(!PG_ARGISNULL(2)) client_description = PG_GETARG_VARCHAR_P(2);
324
325   record = find_record(MyProcPid, 1);
326
327   if(!record) {
328     LWLockRelease(scoreboard->lockid);
329     elog(WARNING, "Cannot allocate scoreboard record");
330     PG_RETURN_VOID();
331   }
332   record->client_port = client_port;
333
334   /* Fill out the client_addr */
335   client_addr_len = client_addr?
336     MIN(VARSIZE(client_addr) - VARHDRSZ, PG_SCOREBOARD_MAX_CLIENT_ADDR_LEN):
337     0;
338   if(client_addr_len) {
339     memcpy(record->client_addr, VARDATA(client_addr), client_addr_len);
340     record->client_addr[client_addr_len] = '\0';
341   }
342   else
343     strcpy(record->client_addr, "local");
344
345   /* Fill out the client_addr */
346   client_description_len = client_description?
347     MIN(VARSIZE(client_description) - VARHDRSZ, PG_SCOREBOARD_MAX_DESCRIPTION):
348     0;
349   if(client_description_len) {
350     memcpy(record->client_description, VARDATA(client_description), client_description_len);
351     record->client_description[client_description_len] = '\0';
352   }
353   else
354     strcpy(record->client_description, "local");
355
356   LWLockRelease(scoreboard->lockid);
357
358   PG_RETURN_VOID();
359 }
360
361 PG_FUNCTION_INFO_V1(process_status);
362 Datum
363 process_status(PG_FUNCTION_ARGS)
364 {
365   pg_scoreboard_record    *record;
366   pg_scoreboard_record_key key;
367   text *status_message = NULL;
368   int   status_message_len;
369   key.procpid = MyProcPid;
370
371   if(PG_ARGISNULL(0)) PG_RETURN_VOID();
372
373   status_message = PG_GETARG_VARCHAR_P(0);
374   setup_pg_scoreboard();
375
376   if(!registered_onexit) {
377     on_shmem_exit(exit_cb, 0);
378     registered_onexit = 1;
379   }
380
381   LWLockAcquire(scoreboard->lockid, LW_EXCLUSIVE);
382
383   record = find_record(key.procpid, 0);
384
385   if(record) {
386     gettimeofday(&record->last_update, NULL);
387     status_message_len = status_message?
388       MIN(VARSIZE(status_message) - VARHDRSZ, PG_SCOREBOARD_MAX_STATUS_MESSAGE_LEN):
389       0;
390     if(status_message_len) {
391       memcpy(record->status_message, VARDATA(status_message), status_message_len);
392       record->status_message[status_message_len] = '\0';
393     }
394     else
395       strcpy(record->status_message, "???");
396   }
397
398   LWLockRelease(scoreboard->lockid);
399
400   PG_RETURN_VOID();
401 }
402
403 PG_FUNCTION_INFO_V1(process_scoreboard);
404 Datum
405 process_scoreboard(PG_FUNCTION_ARGS) {
406   FuncCallContext  *funcctx;
407   MemoryContext     oldcontext;
408   pg_scoreboard_record *records;
409
410   if(SRF_IS_FIRSTCALL()) {
411     int i, j;
412     AttInMetadata        *attinmeta;
413     pg_scoreboard_record *record;
414     TupleDesc             tupdesc;
415
416     funcctx = SRF_FIRSTCALL_INIT();
417     oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
418
419     /* Local copy of data so we don't hold a lock for a long time */
420     funcctx->max_calls = 0;
421     setup_pg_scoreboard();
422
423     LWLockAcquire(scoreboard->lockid, LW_EXCLUSIVE);
424
425     for(i=0;i<MAX_SLOTS;i++) {
426       record = RECORDPTR(i);
427       if(record->create_time.tv_sec != 0) funcctx->max_calls++;
428     }
429     records = palloc(MAX(1,funcctx->max_calls) * sizeof(*record));
430     funcctx->user_fctx = (void *)records;
431     for(i=0,j=0;i<MAX_SLOTS;i++) {
432       record = RECORDPTR(i);
433       if(record->create_time.tv_sec != 0)
434         memcpy(&records[j++], record, sizeof(*record));
435     }
436
437     LWLockRelease(scoreboard->lockid);
438
439     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
440         ereport(ERROR,
441                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
442                  errmsg("function returning record called in context "
443                         "that cannot accept type record")));
444
445     /*
446      * generate attribute metadata needed later to produce tuples from raw
447      * C strings
448      */
449     attinmeta = TupleDescGetAttInMetadata(tupdesc);
450     funcctx->attinmeta = attinmeta;
451
452     MemoryContextSwitchTo(oldcontext);
453   }
454
455   funcctx = SRF_PERCALL_SETUP();
456   records = (pg_scoreboard_record *)funcctx->user_fctx;
457
458   if(funcctx->call_cntr < funcctx->max_calls) {
459     char      **values;
460     HeapTuple   tuple;
461     Datum       result;
462     char        datetime_scratch[32];
463     struct tm   tm, *tmr;
464     time_t      sec;
465     int         i;
466
467     /* procpid, client_addr, client_port,
468      * create_time, last_update,
469      * description, status
470      */
471
472     values = (char **) palloc(7 * sizeof(char *));
473     values[0] = (char *) palloc(8);
474     values[1] = (char *) palloc(PG_SCOREBOARD_MAX_CLIENT_ADDR_LEN + 1);
475     values[2] = (char *) palloc(8);
476     values[3] = (char *) palloc(64);
477     values[4] = (char *) palloc(64);
478     values[5] = (char *) palloc(PG_SCOREBOARD_MAX_DESCRIPTION + 1);
479     values[6] = (char *) palloc(PG_SCOREBOARD_MAX_STATUS_MESSAGE_LEN + 1);
480
481     snprintf(values[0], 8, "%d", records[funcctx->call_cntr].key.procpid);
482     snprintf(values[1], PG_SCOREBOARD_MAX_CLIENT_ADDR_LEN + 1,
483              "%s", records[funcctx->call_cntr].client_addr);
484     snprintf(values[2], 8, "%d", records[funcctx->call_cntr].client_port);
485
486     sec = records[funcctx->call_cntr].create_time.tv_sec;
487     tmr = localtime_r(&sec, &tm);
488     strftime(datetime_scratch, 32, "%Y-%m-%d %H:%M:%S", tmr);
489     snprintf(values[3], 64, "%s.%d", datetime_scratch,
490              (int)(records[funcctx->call_cntr].create_time.tv_usec/1000));
491
492     sec = records[funcctx->call_cntr].last_update.tv_sec;
493     tmr = localtime_r(&sec, &tm);
494     strftime(datetime_scratch, 32, "%Y-%m-%d %H:%M:%S", tmr);
495     snprintf(values[4], 64, "%s.%d", datetime_scratch,
496              (int)(records[funcctx->call_cntr].last_update.tv_usec/1000));
497
498     strcpy(values[5], records[funcctx->call_cntr].client_description);
499     strcpy(values[6], records[funcctx->call_cntr].status_message);
500
501     tuple = BuildTupleFromCStrings(funcctx->attinmeta, values);
502     result = HeapTupleGetDatum(tuple);
503
504     for(i=0;i<7;i++) pfree(values[i]);
505     pfree(values);
506
507     SRF_RETURN_NEXT(funcctx, result);
508   }
509   else {
510     SRF_RETURN_DONE(funcctx);
511   }
512 }
Note: See TracBrowser for help on using the browser.