root/jlog.h

Revision b3e61c55620ec1ab5043a1c727194d8e7fd8655f, 9.5 kB (checked in by Riley Berton <riley.berton@circonus.com>, 4 months ago)

Enhance throughput by using a pre-commit buffer space along with a multi-process control setting

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2008, Message Systems, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *    * Redistributions of source code must retain the above copyright
10  *      notice, this list of conditions and the following disclaimer.
11  *    * Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials provided
14  *      with the distribution.
15  *    * Neither the name Message Systems, Inc. nor the names
16  *      of its contributors may be used to endorse or promote products
17  *      derived from this software without specific prior written
18  *      permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #ifndef _JLOG_H
34 #define _JLOG_H
35
36 #include "jlog_config.h"
37
38 #ifndef JLOG_API
39 # ifdef _WIN32
40 #  ifdef JLOG_EXPORTS
41 #   define JLOG_API(x) __declspec(dllexport) x
42 #  else
43 #   define JLOG_API(x) __declspec(dllimport) x
44 #  endif
45 # else
46 #  ifdef __cplusplus
47 #    define JLOG_API(x)  extern "C" x
48 #  else
49 #    define JLOG_API(x)  x
50 #  endif
51 # endif
52 #endif
53
54 struct _jlog_ctx;
55 struct _jlog_message_header;
56 struct _jlog_id;
57
58 typedef struct _jlog_ctx jlog_ctx;
59
60 typedef struct _jlog_message_header {
61   u_int32_t reserved;
62   u_int32_t tv_sec;
63   u_int32_t tv_usec;
64   u_int32_t mlen;
65 } jlog_message_header;
66
67 typedef struct _jlog_message_header_compressed {
68   u_int32_t reserved;
69   u_int32_t tv_sec;
70   u_int32_t tv_usec;
71   u_int32_t mlen;
72   u_int32_t compressed_len;
73 } jlog_message_header_compressed;
74
75 typedef struct _jlog_id {
76   u_int32_t log;
77   u_int32_t marker;
78 } jlog_id;
79
80 #define JLOG_ID_ADVANCE(id) (id)->marker++
81
82 typedef struct _jlog_message {
83   jlog_message_header_compressed *header;
84   u_int32_t mess_len;
85   void *mess;
86   jlog_message_header_compressed aligned_header;
87 } jlog_message;
88
89 typedef enum {
90   JLOG_BEGIN,
91   JLOG_END
92 } jlog_position;
93
94 typedef enum {
95   JLOG_UNSAFE,
96   JLOG_ALMOST_SAFE,
97   JLOG_SAFE
98 } jlog_safety;
99
100 typedef enum {
101   JLOG_ERR_SUCCESS = 0,
102   JLOG_ERR_ILLEGAL_INIT,
103   JLOG_ERR_ILLEGAL_OPEN,
104   JLOG_ERR_OPEN,
105   JLOG_ERR_NOTDIR,
106   JLOG_ERR_CREATE_PATHLEN,
107   JLOG_ERR_CREATE_EXISTS,
108   JLOG_ERR_CREATE_MKDIR,
109   JLOG_ERR_CREATE_META,
110   JLOG_ERR_CREATE_PRE_COMMIT,
111   JLOG_ERR_LOCK,
112   JLOG_ERR_IDX_OPEN,
113   JLOG_ERR_IDX_SEEK,
114   JLOG_ERR_IDX_CORRUPT,
115   JLOG_ERR_IDX_WRITE,
116   JLOG_ERR_IDX_READ,
117   JLOG_ERR_FILE_OPEN,
118   JLOG_ERR_FILE_SEEK,
119   JLOG_ERR_FILE_CORRUPT,
120   JLOG_ERR_FILE_READ,
121   JLOG_ERR_FILE_WRITE,
122   JLOG_ERR_META_OPEN,
123   JLOG_ERR_PRE_COMMIT_OPEN,
124   JLOG_ERR_ILLEGAL_WRITE,
125   JLOG_ERR_ILLEGAL_CHECKPOINT,
126   JLOG_ERR_INVALID_SUBSCRIBER,
127   JLOG_ERR_ILLEGAL_LOGID,
128   JLOG_ERR_SUBSCRIBER_EXISTS,
129   JLOG_ERR_CHECKPOINT,
130   JLOG_ERR_NOT_SUPPORTED,
131   JLOG_ERR_CLOSE_LOGID,
132 } jlog_err;
133
134 typedef enum {
135   JLOG_COMPRESSION_NULL = 0,
136   JLOG_COMPRESSION_LZ4 = 0x01
137 } jlog_compression_provider_choice;
138
139
140
141 typedef void (*jlog_error_func) (void *ctx, const char *msg, ...);
142
143 JLOG_API(jlog_ctx *) jlog_new(const char *path);
144 JLOG_API(void)      jlog_set_error_func(jlog_ctx *ctx, jlog_error_func Func, void *ptr);
145 JLOG_API(size_t)    jlog_raw_size(jlog_ctx *ctx);
146 JLOG_API(int)       jlog_ctx_init(jlog_ctx *ctx);
147 JLOG_API(int)       jlog_get_checkpoint(jlog_ctx *ctx, const char *s, jlog_id *id);
148 JLOG_API(int)       jlog_ctx_list_subscribers_dispose(jlog_ctx *ctx, char **subs);
149 JLOG_API(int)       jlog_ctx_list_subscribers(jlog_ctx *ctx, char ***subs);
150
151 JLOG_API(int)       jlog_ctx_err(jlog_ctx *ctx);
152 JLOG_API(const char *) jlog_ctx_err_string(jlog_ctx *ctx);
153 JLOG_API(int)       jlog_ctx_errno(jlog_ctx *ctx);
154 JLOG_API(int)       jlog_ctx_open_writer(jlog_ctx *ctx);
155 JLOG_API(int)       jlog_ctx_open_reader(jlog_ctx *ctx, const char *subscriber);
156 JLOG_API(int)       jlog_ctx_close(jlog_ctx *ctx);
157
158 JLOG_API(int)       jlog_ctx_alter_mode(jlog_ctx *ctx, int mode);
159 JLOG_API(int)       jlog_ctx_alter_journal_size(jlog_ctx *ctx, size_t size);
160 JLOG_API(int)       jlog_ctx_repair(jlog_ctx *ctx, int aggressive);
161 JLOG_API(int)       jlog_ctx_alter_safety(jlog_ctx *ctx, jlog_safety safety);
162
163 /**
164  * Control whether this jlog process should use multi-process safe file locks when performing
165  * reads or writes.  If you do not intend to use your jlog from multiple processes, you can
166  * call this function with a zero for the `mproc` argument.  Multi-process safety defaults to being
167  * on.
168  *
169  * \sa jlog_ctx_set_pre_commit_buffer_size
170  */
171 JLOG_API(int)       jlog_ctx_set_multi_process(jlog_ctx *ctx, uint8_t mproc);
172
173 /**
174  * must be called after jlog_new and before the 'open' functions
175  * defaults to using JLOG_COMPRESSION_LZ4
176  */
177 JLOG_API(int)       jlog_ctx_set_use_compression(jlog_ctx *ctx, uint8_t use);
178
179 /**
180  * Switch to another compression provider.  This has no effect on an already created jlog
181  * as you cannot change compression after a jlog is inited.  If you want to change
182  * the compression provider you must call this before `jlog_ctx_init`, ala:
183  *
184  * ```ctx = jlog_new(path);
185  * jlog_ctx_set_use_compression(ctx, 1);
186  * jlog_ctx_set_compression_provider(ctx, JLOG_COMPRESSION_LZ4);
187  * jlog_ctx_alter_journal_size(ctx, 1024000);
188  * jlog_ctx_set_pre_commit_buffer_size(ctx, 1024 * 1024);
189  * if(jlog_ctx_init(ctx) != 0) {
190  * ...```
191  *
192  */
193 JLOG_API(int)       jlog_ctx_set_compression_provider(jlog_ctx *ctx, jlog_compression_provider_choice provider);
194
195 /**
196  * Turn on the use of a pre-commit buffer.  This will gain you increased throughput through reduction of
197  * `pwrite/v` syscalls.  Note however, care must be taken.  This is only safe for single writer
198  * setups.  Merely setting multi-process to on does not protect the pre-commit space from being
199  * corrupted by another writing process.  It is safe to use the pre-commit buffer if you have multiple
200  * reader processes and a single writer process, or if you read and write from within the same process.
201  *
202  * If you intend to use multiple writing processes, you need to set the pre-commit buffer size to
203  * zero (the default for safety).
204  *
205  * There is a tradeoff here between throughput for jlog and read side latency.
206  * Because reads only happen on materialized rows (rows stored in actual jlog files), a large
207  * pre_commit buffer size will delay the materialization of the log entries in the actual
208  * storage files and therefore delay the read side. 
209  *
210  * You should set this appropriately for your write throughput and read latency requirements
211  * based on the rate you expect to be writing things to the log and the size of the average
212  * logged item.
213  *
214  * This must be called before `jlog_ctx_open_writer`
215  *
216  */
217 JLOG_API(int)       jlog_ctx_set_pre_commit_buffer_size(jlog_ctx *ctx, size_t s);
218
219 /**
220  * Provided to deal with read side latency problem.  If you intend to have a larg-ish pre-commit
221  * buffer to have high throughput but have variability in your throughput there are times when
222  * the rows won't be committed for the read side to see.  This call is provided to flush
223  * the pre-commit buffer whenever you want.  Normally you would wire this up to a timer event.
224  */
225 JLOG_API(int)       jlog_ctx_flush_pre_commit_buffer(jlog_ctx *ctx);
226
227 JLOG_API(int)       jlog_ctx_add_subscriber(jlog_ctx *ctx, const char *subscriber,
228                                             jlog_position whence);
229 JLOG_API(int)       jlog_ctx_add_subscriber_copy_checkpoint(jlog_ctx *ctx,
230                                                             const char *new_subscriber,
231                                                             const char *old_subscriber);
232 JLOG_API(int)       jlog_ctx_set_subscriber_checkpoint(jlog_ctx *ctx, const char *subscriber,
233                                             const jlog_id *checkpoint);
234 JLOG_API(int)       jlog_ctx_remove_subscriber(jlog_ctx *ctx, const char *subscriber);
235
236 JLOG_API(int)       jlog_ctx_write(jlog_ctx *ctx, const void *message, size_t mess_len);
237 JLOG_API(int)       jlog_ctx_write_message(jlog_ctx *ctx, jlog_message *msg, struct timeval *when);
238 JLOG_API(int)       jlog_ctx_read_interval(jlog_ctx *ctx,
239                                            jlog_id *first_mess, jlog_id *last_mess);
240 JLOG_API(int)       jlog_ctx_read_message(jlog_ctx *ctx, const jlog_id *, jlog_message *);
241 JLOG_API(int)       jlog_ctx_read_checkpoint(jlog_ctx *ctx, const jlog_id *checkpoint);
242 JLOG_API(int)       jlog_snprint_logid(char *buff, int n, const jlog_id *checkpoint);
243
244 JLOG_API(int)       jlog_pending_readers(jlog_ctx *ctx, u_int32_t log, u_int32_t *earliest_ptr);
245 JLOG_API(int)       __jlog_pending_readers(jlog_ctx *ctx, u_int32_t log);
246 JLOG_API(int)       jlog_ctx_first_log_id(jlog_ctx *ctx, jlog_id *id);
247 JLOG_API(int)       jlog_ctx_last_log_id(jlog_ctx *ctx, jlog_id *id);
248 JLOG_API(int)       jlog_ctx_advance_id(jlog_ctx *ctx, jlog_id *cur,
249                                         jlog_id *start, jlog_id *finish);
250 JLOG_API(int)       jlog_clean(const char *path);
251
252 #endif
Note: See TracBrowser for help on using the browser.