9

I'd like to insert some binary data into a BYTEA column,

How would I go about streaming the contents of somefile.tar.gz into a table with a BYTEA column?

Is it possible to stream to/from postgres from/to golang?

1
  • 2
    Did you have a look at the documentation of the packages? Like mentioned, use []byte for bytea columns in pq. sqlx examples of inserting data. Commented Mar 16, 2019 at 9:26

1 Answer 1

1

If one would be willing to switch to github.com/jackc/pgx, Large Objects (PostgreSQL docs) can be streamed. The pgx.LargeObject type implements:

io.Writer
io.Reader
io.Seeker
io.Closer

Large Objects are stored in a system table, there is no Large Object type which can be used in a column of a table. Large Objects are referenced by their object identifier. So a separate table needs to be maintained with file metadata and oid mapping.

Example program:

package main

import (
    "context"
    "io"
    "log"
    "os"
    "time"

    "github.com/jackc/pgx/v4"
)

const (
    // files table maps Large Object oid to file names
    createFileTable = `CREATE TABLE files (
        id oid primary key,
        name varchar,
        unique(name)
    );`
)

func main() {
    ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
    defer cancel()

    conn, err := pgx.Connect(ctx, "user=postgres host=/run/postgresql dbname=postgres")
    if err != nil {
        panic(err)
    }
    defer conn.Close(ctx)

    if _, err = conn.Exec(ctx, createFileTable); err != nil {
        panic(err)
    }

    written, err := storeFile(ctx, conn, "somefile.bin")
    log.Printf("storeFile written: %d", written)
    if err != nil {
        panic(err)
    }

    read, err := loadFile(ctx, conn, "somefile.bin")
    log.Printf("loadFile read: %d", read)
    if err != nil {
        panic(err)
    }
}

// storeFile as Large Object in the database.
// The resulting object identifier is stored along with the file name in the files table.
// The amount of written bytes and an erorr is returned, if one occured.
func storeFile(ctx context.Context, conn *pgx.Conn, name string) (written int64, err error) {
    file, err := os.Open(name)
    if err != nil {
        return 0, err
    }
    defer file.Close()

    // LargeObjects can only operate on an active TX
    tx, err := conn.Begin(ctx)
    if err != nil {
        return 0, err
    }
    defer tx.Rollback(ctx)

    lobs := tx.LargeObjects()

    // Create a new Large Object.
    // We pass 0, so the DB can pick an available oid for us.
    oid, err := lobs.Create(ctx, 0)
    if err != nil {
        return 0, err
    }

    // record the oid and filename in the files table
    _, err = tx.Exec(ctx, "INSERT INTO files (id, name) VALUES ($1, $2)", oid, name)
    if err != nil {
        return 0, err
    }

    // Open the new Object for writing.
    obj, err := lobs.Open(ctx, oid, pgx.LargeObjectModeWrite)
    if err != nil {
        return 0, err
    }

    // Copy the file stream to the Large Object stream
    written, err = io.Copy(obj, file)
    if err != nil {
        return written, err
    }

    err = tx.Commit(ctx)
    return written, err
}

// loadFile loads the file identified by name as Large Object
// and writes the contents to a local file by the same name.
// The amount of bytes read or an error is returned.
func loadFile(ctx context.Context, conn *pgx.Conn, name string) (read int64, err error) {
    tx, err := conn.Begin(ctx)
    if err != nil {
        return 0, err
    }
    defer tx.Rollback(ctx)

    var oid uint32
    err = conn.QueryRow(ctx, "SELECT id FROM files WHERE name = $1", name).Scan(&oid)
    if err != nil {
        return 0, err
    }

    file, err := os.Create(name)
    if err != nil {
        return 0, err
    }

    lobs := tx.LargeObjects()
    obj, err := lobs.Open(ctx, oid, pgx.LargeObjectModeRead)
    if err != nil {
        return 0, err
    }

    return io.Copy(file, obj)
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.