Skip to content

Commit 8d69c58

Browse files
committed
Initial implementation
1 parent 0d95ec6 commit 8d69c58

File tree

3 files changed

+335
-0
lines changed

3 files changed

+335
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
target/
2+
Cargo.lock

Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "postgres_large_object"
3+
version = "0.1.0"
4+
authors = ["Steven Fackler <sfackler@gmail.com>"]
5+
license = "MIT"
6+
description = "Large object support for rust-postgres"
7+
repository = "https://github.com/sfackler/rust-postgres-large-object"
8+
documentation = "https://sfackler.github.io/rust-postgres-large-object/doc/postgres_large_object"
9+
readme = "README.md"
10+
keywords = ["database", "sql", "postgres"]
11+
12+
[dependencies]
13+
postgres = "0.6.4"

src/lib.rs

Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
//! A crate providing access to the Postgres large object API.
2+
#![feature(unsafe_destructor, io, core)]
3+
4+
extern crate postgres;
5+
6+
use std::old_io::{self, IoResult, IoError, IoErrorKind, SeekStyle};
7+
use std::i32;
8+
use std::slice::bytes;
9+
use std::cmp;
10+
use std::num::FromPrimitive;
11+
12+
use postgres::{Oid, Error, Result, Transaction, GenericConnection};
13+
14+
/// An extension trait adding functionality to create and delete large objects.
15+
pub trait LargeObjectExt: GenericConnection {
16+
/// Creates a new large object, returning its `Oid`.
17+
fn create_large_object(&self) -> Result<Oid>;
18+
19+
/// Deletes the large object with the specified `Oid`.
20+
fn delete_large_object(&self, oid: Oid) -> Result<()>;
21+
}
22+
23+
impl<T: GenericConnection> LargeObjectExt for T {
24+
fn create_large_object(&self) -> Result<Oid> {
25+
let stmt = try!(self.prepare_cached("SELECT pg_catalog.lo_create(0)"));
26+
stmt.query(&[]).map(|mut r| r.next().unwrap().get(0))
27+
}
28+
29+
fn delete_large_object(&self, oid: Oid) -> Result<()> {
30+
let stmt = try!(self.prepare_cached("SELECT pg_catalog.lo_unlink($1)"));
31+
stmt.execute(&[&oid]).map(|_| ())
32+
}
33+
}
34+
35+
/// Large object access modes.
36+
///
37+
/// Note that Postgres currently does not make any distinction between the
38+
/// `Write` and `ReadWrite` modes.
39+
pub enum Mode {
40+
/// An object opened in this mode may only be read from.
41+
Read,
42+
/// An object opened in this mode may be written to.
43+
Write,
44+
/// An object opened in this mode may be read from or written to.
45+
ReadWrite,
46+
}
47+
48+
impl Mode {
49+
fn to_i32(&self) -> i32 {
50+
match *self {
51+
Mode::Read => 0x00040000,
52+
Mode::Write => 0x00020000,
53+
Mode::ReadWrite => 0x00040000 | 0x00020000,
54+
}
55+
}
56+
}
57+
58+
/// An extension trait adding functionality to open large objects.
59+
pub trait LargeObjectTransactionExt {
60+
/// Opens the large object with the specified `Oid` in the specified `Mode`.
61+
fn open_large_object<'a>(&'a self, oid: Oid, mode: Mode) -> Result<LargeObject<'a>>;
62+
}
63+
64+
impl<'conn> LargeObjectTransactionExt for Transaction<'conn> {
65+
fn open_large_object<'a>(&'a self, oid: Oid, mode: Mode) -> Result<LargeObject<'a>> {
66+
let version = self.connection().parameter("server_version").unwrap();
67+
let mut version = version.split('.');
68+
let major: i32 = version.next().unwrap().parse().unwrap();
69+
let minor: i32 = version.next().unwrap().parse().unwrap();
70+
let has_64 = major > 9 || (major == 9 && minor >= 3);
71+
72+
let stmt = try!(self.prepare_cached("SELECT pg_catalog.lo_open($1, $2)"));
73+
let fd = try!(stmt.query(&[&oid, &mode.to_i32()])).next().unwrap().get(0);
74+
Ok(LargeObject {
75+
trans: self,
76+
fd: fd,
77+
has_64: has_64,
78+
finished: false,
79+
})
80+
}
81+
}
82+
83+
macro_rules! try_io {
84+
($e:expr) => {
85+
match $e {
86+
Ok(ok) => ok,
87+
Err(e) => return Err(IoError {
88+
kind: IoErrorKind::OtherIoError,
89+
desc: "error communicating with server",
90+
detail: Some(format!("{}", e)),
91+
})
92+
}
93+
}
94+
}
95+
96+
/// Represents an open large object.
97+
pub struct LargeObject<'a> {
98+
trans: &'a Transaction<'a>,
99+
fd: i32,
100+
has_64: bool,
101+
finished: bool,
102+
}
103+
104+
#[unsafe_destructor]
105+
impl<'a> Drop for LargeObject<'a> {
106+
fn drop(&mut self) {
107+
let _ = self.finish_inner();
108+
}
109+
}
110+
111+
impl<'a> LargeObject<'a> {
112+
/// Truncates the object to the specified size.
113+
///
114+
/// If `len` is larger than the size of the object, it will be padded with
115+
/// null bytes to the specified size.
116+
pub fn truncate(&mut self, len: i64) -> Result<()> {
117+
if self.has_64 {
118+
let stmt = try!(self.trans.prepare_cached("SELECT pg_catalog.lo_truncate64($1, $2)"));
119+
stmt.execute(&[&self.fd, &len]).map(|_| ())
120+
} else {
121+
let len: i32 = match FromPrimitive::from_i64(len) {
122+
Some(len) => len,
123+
None => return Err(Error::IoError(IoError {
124+
kind: IoErrorKind::InvalidInput,
125+
desc: "The database does not support objects larger than 2GB",
126+
detail: None,
127+
})),
128+
};
129+
let stmt = try!(self.trans.prepare_cached("SELECT pg_catalog.lo_truncate($1, $2)"));
130+
stmt.execute(&[&self.fd, &len]).map(|_| ())
131+
}
132+
}
133+
134+
fn finish_inner(&mut self) -> Result<()> {
135+
if self.finished {
136+
return Ok(());
137+
}
138+
139+
self.finished = true;
140+
let stmt = try!(self.trans.prepare_cached("SELECT pg_catalog.lo_close($1)"));
141+
stmt.execute(&[&self.fd]).map(|_| ())
142+
}
143+
144+
/// Consumes the `LargeObject`, cleaning up server side state.
145+
///
146+
/// Functionally identical to the `Drop` implementation on `LargeObject`
147+
/// except that it returns any errors to the caller.
148+
pub fn finish(mut self) -> Result<()> {
149+
self.finish_inner()
150+
}
151+
}
152+
153+
impl<'a> Reader for LargeObject<'a> {
154+
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
155+
let stmt = try_io!(self.trans.prepare_cached("SELECT pg_catalog.loread($1, $2)"));
156+
let cap = cmp::min(buf.len(), i32::MAX as usize) as i32;
157+
let out: Vec<u8> = try_io!(stmt.query(&[&self.fd, &cap])).next().unwrap().get(0);
158+
159+
if !buf.is_empty() && out.is_empty() {
160+
return Err(old_io::standard_error(IoErrorKind::EndOfFile));
161+
}
162+
163+
bytes::copy_memory(buf, &out);
164+
Ok(out.len())
165+
}
166+
}
167+
168+
impl<'a> Writer for LargeObject<'a> {
169+
fn write_all(&mut self, mut buf: &[u8]) -> IoResult<()> {
170+
let stmt = try_io!(self.trans.prepare_cached("SELECT pg_catalog.lowrite($1, $2)"));
171+
172+
while !buf.is_empty() {
173+
let cap = cmp::min(buf.len(), i32::MAX as usize);
174+
try_io!(stmt.execute(&[&self.fd, &&buf[..cap]]));
175+
buf = &buf[cap..];
176+
}
177+
178+
Ok(())
179+
}
180+
}
181+
182+
impl<'a> Seek for LargeObject<'a> {
183+
fn tell(&self) -> IoResult<u64> {
184+
if self.has_64 {
185+
let stmt = try_io!(self.trans.prepare_cached("SELECT pg_catalog.lo_tell64($1)"));
186+
Ok(try_io!(stmt.query(&[&self.fd])).next().unwrap().get::<_, i64>(0) as u64)
187+
} else {
188+
let stmt = try_io!(self.trans.prepare_cached("SELECT pg_catalog.lo_tell($1)"));
189+
Ok(try_io!(stmt.query(&[&self.fd])).next().unwrap().get::<_, i32>(0) as u64)
190+
}
191+
}
192+
193+
fn seek(&mut self, pos: i64, style: SeekStyle) -> IoResult<()> {
194+
let kind = match style {
195+
SeekStyle::SeekSet => 0,
196+
SeekStyle::SeekCur => 1,
197+
SeekStyle::SeekEnd => 2,
198+
};
199+
200+
if self.has_64 {
201+
let stmt = try_io!(self.trans.prepare_cached("SELECT pg_catalog.lo_lseek64($1, $2, $3)"));
202+
try_io!(stmt.execute(&[&self.fd, &pos, &kind]));
203+
} else {
204+
let pos: i32 = match FromPrimitive::from_i64(pos) {
205+
Some(pos) => pos,
206+
None => return Err(IoError {
207+
kind: IoErrorKind::InvalidInput,
208+
desc: "The database does not support seeks larger than 2GB",
209+
detail: None,
210+
}),
211+
};
212+
let stmt = try_io!(self.trans.prepare_cached("SELECT pg_catalog.lo_lseek($1, $2, $3)"));
213+
try_io!(stmt.execute(&[&self.fd, &pos, &kind]));
214+
}
215+
216+
Ok(())
217+
}
218+
}
219+
220+
#[cfg(test)]
221+
mod test {
222+
use std::old_io::SeekStyle;
223+
use postgres::{Connection, SslMode, SqlState, Error};
224+
225+
use {LargeObjectExt, LargeObjectTransactionExt, Mode};
226+
227+
#[test]
228+
fn test_create_delete() {
229+
let conn = Connection::connect("postgres://postgres@localhost", &SslMode::None).unwrap();
230+
let oid = conn.create_large_object().unwrap();
231+
conn.delete_large_object(oid).unwrap();
232+
}
233+
234+
#[test]
235+
fn test_delete_bogus() {
236+
let conn = Connection::connect("postgres://postgres@localhost", &SslMode::None).unwrap();
237+
match conn.delete_large_object(0) {
238+
Ok(()) => panic!("unexpected success"),
239+
Err(Error::DbError(ref e)) if e.code() == &SqlState::UndefinedObject => {}
240+
Err(e) => panic!("unexpected error: {:?}", e),
241+
}
242+
}
243+
244+
#[test]
245+
fn test_open_bogus() {
246+
let conn = Connection::connect("postgres://postgres@localhost", &SslMode::None).unwrap();
247+
let trans = conn.transaction().unwrap();
248+
match trans.open_large_object(0, Mode::Read) {
249+
Ok(_) => panic!("unexpected success"),
250+
Err(Error::DbError(ref e)) if e.code() == &SqlState::UndefinedObject => {}
251+
Err(e) => panic!("unexpected error: {:?}", e),
252+
};
253+
}
254+
255+
#[test]
256+
fn test_open_finish() {
257+
let conn = Connection::connect("postgres://postgres@localhost", &SslMode::None).unwrap();
258+
let trans = conn.transaction().unwrap();
259+
let oid = trans.create_large_object().unwrap();
260+
let lo = trans.open_large_object(oid, Mode::Read).unwrap();
261+
lo.finish().unwrap();
262+
}
263+
264+
#[test]
265+
fn test_write_read() {
266+
let conn = Connection::connect("postgres://postgres@localhost", &SslMode::None).unwrap();
267+
let trans = conn.transaction().unwrap();
268+
let oid = trans.create_large_object().unwrap();
269+
let mut lo = trans.open_large_object(oid, Mode::Write).unwrap();
270+
lo.write_all(b"hello world!!!").unwrap();
271+
let mut lo = trans.open_large_object(oid, Mode::Read).unwrap();
272+
assert_eq!(b"hello world!!!", lo.read_to_end().unwrap());
273+
}
274+
275+
#[test]
276+
fn test_seek_tell() {
277+
let conn = Connection::connect("postgres://postgres@localhost", &SslMode::None).unwrap();
278+
let trans = conn.transaction().unwrap();
279+
let oid = trans.create_large_object().unwrap();
280+
let mut lo = trans.open_large_object(oid, Mode::Write).unwrap();
281+
lo.write_all(b"hello world!!!").unwrap();
282+
283+
assert_eq!(14, lo.tell().unwrap());
284+
lo.seek(1, SeekStyle::SeekSet).unwrap();
285+
assert_eq!(1, lo.tell().unwrap());
286+
assert_eq!(b'e', lo.read_u8().unwrap());
287+
assert_eq!(2, lo.tell().unwrap());
288+
lo.seek(-4, SeekStyle::SeekEnd).unwrap();
289+
assert_eq!(10, lo.tell().unwrap());
290+
assert_eq!(b'd', lo.read_u8().unwrap());
291+
lo.seek(-3, SeekStyle::SeekCur).unwrap();
292+
assert_eq!(8, lo.tell().unwrap());
293+
assert_eq!(b'r', lo.read_u8().unwrap());
294+
}
295+
296+
#[test]
297+
fn test_write_with_read_fd() {
298+
let conn = Connection::connect("postgres://postgres@localhost", &SslMode::None).unwrap();
299+
let trans = conn.transaction().unwrap();
300+
let oid = trans.create_large_object().unwrap();
301+
let mut lo = trans.open_large_object(oid, Mode::Read).unwrap();
302+
assert!(lo.write_all(b"hello world!!!").is_err());
303+
}
304+
305+
#[test]
306+
fn test_truncate() {
307+
let conn = Connection::connect("postgres://postgres@localhost", &SslMode::None).unwrap();
308+
let trans = conn.transaction().unwrap();
309+
let oid = trans.create_large_object().unwrap();
310+
let mut lo = trans.open_large_object(oid, Mode::Write).unwrap();
311+
lo.write_all(b"hello world!!!").unwrap();
312+
313+
lo.truncate(5).unwrap();
314+
lo.seek(0, SeekStyle::SeekSet).unwrap();
315+
assert_eq!(b"hello", lo.read_to_end().unwrap());
316+
lo.truncate(10).unwrap();
317+
lo.seek(0, SeekStyle::SeekSet).unwrap();
318+
assert_eq!(b"hello\0\0\0\0\0", lo.read_to_end().unwrap());
319+
}
320+
}

0 commit comments

Comments
 (0)