1616
1717#include "raftable.h"
1818
19- #define RAFTABLE_SHMEM_SIZE (16 * 1024)
19+ #define RAFTABLE_KEY_LEN (64)
20+ #define RAFTABLE_BLOCK_LEN (256)
21+ #define RAFTABLE_BLOCKS (4096)
22+ #define RAFTABLE_BLOCK_MEM (RAFTABLE_BLOCK_LEN * (RAFTABLE_BLOCKS - 1) + sizeof(RaftableBlockMem))
2023#define RAFTABLE_HASH_SIZE (127)
21- #define RAFTABLE_VALUE_LEN 64
24+ #define RAFTABLE_SHMEM_SIZE ((1024 * 1024) + RAFTABLE_BLOCK_MEM)
25+
26+ typedef struct RaftableBlock
27+ {
28+ struct RaftableBlock * next ;
29+ char data [RAFTABLE_BLOCK_LEN - sizeof (void * )];
30+ } RaftableBlock ;
31+
32+ typedef struct RaftableKey
33+ {
34+ char data [RAFTABLE_KEY_LEN ];
35+ } RaftableKey ;
36+
37+ typedef struct RaftableEntry
38+ {
39+ RaftableKey key ;
40+ int len ;
41+ RaftableBlock * value ;
42+ } RaftableEntry ;
43+
44+ typedef struct RaftableBlockMem
45+ {
46+ RaftableBlock * free_blocks ;
47+ RaftableBlock blocks [1 ];
48+ } RaftableBlockMem ;
2249
2350void _PG_init (void );
2451void _PG_fini (void );
@@ -29,53 +56,183 @@ PG_FUNCTION_INFO_V1(raftable_sql_get);
2956PG_FUNCTION_INFO_V1 (raftable_sql_set );
3057PG_FUNCTION_INFO_V1 (raftable_sql_list );
3158
32- static HTAB * data ;
33- static LWLockId datalock ;
59+ static HTAB * hashtable ;
60+ static LWLockId hashlock ;
61+
62+ static RaftableBlockMem * blockmem ;
63+ static LWLockId blocklock ;
64+
3465static shmem_startup_hook_type PreviousShmemStartupHook ;
3566
36- typedef struct RaftableEntry
67+ static RaftableBlock * block_alloc ( void )
3768{
38- int key ;
39- char value [RAFTABLE_VALUE_LEN ];
40- } RaftableEntry ;
69+ RaftableBlock * result ;
70+
71+ LWLockAcquire (blocklock , LW_EXCLUSIVE );
72+
73+ result = blockmem -> free_blocks ;
74+ if (result == NULL )
75+ elog (ERROR , "raftable memory limit hit" );
76+
77+
78+ blockmem -> free_blocks = blockmem -> free_blocks -> next ;
79+ result -> next = NULL ;
80+ LWLockRelease (blocklock );
81+ return result ;
82+ }
83+
84+ static void block_free (RaftableBlock * block )
85+ {
86+ RaftableBlock * new_free_head = block ;
87+ Assert (block != NULL );
88+ while (block -> next != NULL ) {
89+ block = block -> next ;
90+ }
91+ LWLockAcquire (blocklock , LW_EXCLUSIVE );
92+ block -> next = blockmem -> free_blocks ;
93+ blockmem -> free_blocks = new_free_head ;
94+ LWLockRelease (blocklock );
95+ }
96+
97+
98+ static text * entry_to_text (RaftableEntry * e )
99+ {
100+ char * cursor , * buf ;
101+ RaftableBlock * block ;
102+ text * t ;
103+ int len ;
104+
105+ buf = palloc (e -> len + 1 );
106+ cursor = buf ;
107+
108+ block = e -> value ;
109+ len = e -> len ;
110+ while (block != NULL )
111+ {
112+ int tocopy = len ;
113+ if (tocopy > sizeof (block -> data ))
114+ tocopy = sizeof (block -> data );
115+
116+ memcpy (cursor , block -> data , tocopy );
117+ cursor += tocopy ;
118+ len -= tocopy ;
119+
120+ Assert (cursor - buf <= e -> len );
121+ block = block -> next ;
122+ }
123+ Assert (cursor - buf == e -> len );
124+ * cursor = '\0' ;
125+ t = cstring_to_text_with_len (buf , e -> len );
126+ pfree (buf );
127+ return t ;
128+ }
129+
130+ static void text_to_entry (RaftableEntry * e , text * t )
131+ {
132+ char * buf , * cursor ;
133+ int len ;
134+ RaftableBlock * block ;
135+
136+ buf = text_to_cstring (t );
137+ cursor = buf ;
138+ len = strlen (buf );
139+ e -> len = len ;
140+
141+ if (e -> len > 0 )
142+ {
143+ if (e -> value == NULL )
144+ e -> value = block_alloc ();
145+ Assert (e -> value != NULL );
146+ }
147+ else
148+ {
149+ if (e -> value != NULL )
150+ {
151+ block_free (e -> value );
152+ e -> value = NULL ;
153+ }
154+ }
155+
156+ block = e -> value ;
157+ while (len > 0 )
158+ {
159+ int tocopy = len ;
160+ if (tocopy > sizeof (block -> data ))
161+ {
162+ tocopy = sizeof (block -> data );
163+ if (block -> next == NULL )
164+ block -> next = block_alloc ();
165+ }
166+ else
167+ {
168+ if (block -> next != NULL )
169+ {
170+ block_free (block -> next );
171+ block -> next = NULL ;
172+ }
173+ }
174+
175+ memcpy (block -> data , cursor , tocopy );
176+ cursor += tocopy ;
177+ len -= tocopy ;
178+
179+ block = block -> next ;
180+ }
181+
182+ pfree (buf );
183+ Assert (block == NULL );
184+ }
41185
42186Datum
43187raftable_sql_get (PG_FUNCTION_ARGS )
44188{
45189 RaftableEntry * entry ;
46- int key = PG_GETARG_INT32 (0 );
190+ RaftableKey key ;
191+ text_to_cstring_buffer (PG_GETARG_TEXT_P (0 ), key .data , sizeof (key .data ));
47192
48- LWLockAcquire (datalock , LW_SHARED );
49- entry = hash_search (data , & key , HASH_FIND , NULL );
193+ LWLockAcquire (hashlock , LW_SHARED );
194+ entry = hash_search (hashtable , & key , HASH_FIND , NULL );
50195
51196 if (entry )
52197 {
53- text * t = cstring_to_text (entry -> value );
54- LWLockRelease (datalock );
198+ text * t = entry_to_text (entry );
199+ LWLockRelease (hashlock );
55200 PG_RETURN_TEXT_P (t );
56201 }
57202 else
58203 {
59- LWLockRelease (datalock );
204+ LWLockRelease (hashlock );
60205 PG_RETURN_NULL ();
61206 }
62207}
63208
64209Datum
65210raftable_sql_set (PG_FUNCTION_ARGS )
66211{
67- int key = PG_GETARG_INT32 (0 );
212+ RaftableKey key ;
213+ text_to_cstring_buffer (PG_GETARG_TEXT_P (0 ), key .data , sizeof (key .data ));
68214
69- LWLockAcquire (datalock , LW_EXCLUSIVE );
215+ LWLockAcquire (hashlock , LW_EXCLUSIVE );
70216 if (PG_ARGISNULL (1 ))
71- hash_search (data , & key , HASH_REMOVE , NULL );
217+ {
218+ RaftableEntry * entry = hash_search (hashtable , key .data , HASH_FIND , NULL );
219+ if ((entry != NULL ) && (entry -> len > 0 ))
220+ block_free (entry -> value );
221+ hash_search (hashtable , key .data , HASH_REMOVE , NULL );
222+ }
72223 else
73224 {
74- RaftableEntry * entry = hash_search (data , & key , HASH_ENTER , NULL );
75- entry -> key = key ;
76- text_to_cstring_buffer (PG_GETARG_TEXT_P (1 ), entry -> value , sizeof (entry -> value ));
225+ bool found ;
226+ RaftableEntry * entry = hash_search (hashtable , key .data , HASH_ENTER , & found );
227+ if (!found )
228+ {
229+ entry -> key = key ;
230+ entry -> value = NULL ;
231+ entry -> len = 0 ;
232+ }
233+ text_to_entry (entry , PG_GETARG_TEXT_P (1 ));
77234 }
78- LWLockRelease (datalock );
235+ LWLockRelease (hashlock );
79236
80237 PG_RETURN_VOID ();
81238}
@@ -96,12 +253,15 @@ raftable_sql_list(PG_FUNCTION_ARGS)
96253 oldcontext = MemoryContextSwitchTo (funcctx -> multi_call_memory_ctx );
97254
98255 tfc = get_call_result_type (fcinfo , NULL , & funcctx -> tuple_desc );
99- Assert (tfc == TYPEFUNC_COMPOSITE );
256+ if (tfc != TYPEFUNC_COMPOSITE )
257+ {
258+ elog (ERROR , "raftable listing function should be composite" );
259+ }
100260 funcctx -> tuple_desc = BlessTupleDesc (funcctx -> tuple_desc );
101261
102262 scan = (HASH_SEQ_STATUS * )palloc (sizeof (HASH_SEQ_STATUS ));
103- LWLockAcquire (datalock , LW_SHARED );
104- hash_seq_init (scan , data );
263+ LWLockAcquire (hashlock , LW_SHARED );
264+ hash_seq_init (scan , hashtable );
105265
106266 MemoryContextSwitchTo (oldcontext );
107267
@@ -117,52 +277,59 @@ raftable_sql_list(PG_FUNCTION_ARGS)
117277 Datum vals [2 ];
118278 bool isnull [2 ];
119279
120- vals [0 ] = Int32GetDatum (entry -> key );
121- vals [1 ] = CStringGetTextDatum ( entry -> value );
280+ vals [0 ] = CStringGetTextDatum (entry -> key . data );
281+ vals [1 ] = PointerGetDatum ( entry_to_text ( entry ) );
122282 isnull [0 ] = isnull [1 ] = false;
123283
124284 tuple = heap_form_tuple (funcctx -> tuple_desc , vals , isnull );
125285 SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (tuple ));
126286 }
127287 else
128288 {
129- LWLockRelease (datalock );
289+ LWLockRelease (hashlock );
130290 SRF_RETURN_DONE (funcctx );
131291 }
132292
133293}
134294
135- static uint32 raftable_hash_fn (const void * key , Size keysize )
136- {
137- return (uint32 )* (int * )key ;
138- }
139-
140- static int raftable_match_fn (const void * key1 , const void * key2 , Size keysize )
141- {
142- return * (int * )key1 != * (int * )key2 ;
143- }
144-
145295static void startup_shmem (void )
146296{
147- HASHCTL info ;
148-
149297 if (PreviousShmemStartupHook ){
150298 PreviousShmemStartupHook ();
151299 }
152300
153- datalock = LWLockAssign ();
301+ {
302+ HASHCTL info ;
303+ hashlock = LWLockAssign ();
154304
155- info .keysize = sizeof (int );
156- info .entrysize = sizeof (RaftableEntry );
157- info .hash = raftable_hash_fn ;
158- info .match = raftable_match_fn ;
305+ info .keysize = sizeof (RaftableKey );
306+ info .entrysize = sizeof (RaftableEntry );
159307
160- data = ShmemInitHash (
161- "raftable" ,
162- RAFTABLE_HASH_SIZE , RAFTABLE_HASH_SIZE ,
163- & info ,
164- HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
165- );
308+ hashtable = ShmemInitHash (
309+ "raftable_hashtable" ,
310+ RAFTABLE_HASH_SIZE , RAFTABLE_HASH_SIZE ,
311+ & info , HASH_ELEM
312+ );
313+ }
314+
315+ {
316+ bool found ;
317+ int i ;
318+
319+ blocklock = LWLockAssign ();
320+
321+ blockmem = ShmemInitStruct (
322+ "raftable_blockmem" ,
323+ RAFTABLE_BLOCK_MEM ,
324+ & found
325+ );
326+
327+ for (i = 0 ; i < RAFTABLE_BLOCKS - 1 ; i ++ ) {
328+ blockmem -> blocks [i ].next = blockmem -> blocks + i + 1 ;
329+ }
330+ blockmem -> blocks [RAFTABLE_BLOCKS - 1 ].next = NULL ;
331+ blockmem -> free_blocks = blockmem -> blocks ;
332+ }
166333}
167334
168335void
@@ -171,7 +338,7 @@ _PG_init(void)
171338 if (!process_shared_preload_libraries_in_progress )
172339 elog (ERROR , "please add 'raftable' to shared_preload_libraries list" );
173340 RequestAddinShmemSpace (RAFTABLE_SHMEM_SIZE );
174- RequestAddinLWLocks (1 );
341+ RequestAddinLWLocks (2 );
175342
176343 PreviousShmemStartupHook = shmem_startup_hook ;
177344 shmem_startup_hook = startup_shmem ;
0 commit comments