From 42f8ec11fde3f3ebf0c8e250563c4c8c5d3997ae Mon Sep 17 00:00:00 2001 From: Alex Nachlas Date: Mon, 5 Dec 2016 16:59:56 -0500 Subject: [PATCH] First pass at removing part of. Needs more refactoring. --- lib/postgres.dart | 23 +- lib/src/client_messages.dart | 73 +++-- lib/src/connection.dart | 534 ++++++++++++++++++++++++++++++-- lib/src/connection_fsm.dart | 351 --------------------- lib/src/constants.dart | 2 - lib/src/exceptions.dart | 47 ++- lib/src/message_window.dart | 47 +-- lib/src/postgresql_codec.dart | 5 +- lib/src/query.dart | 80 ++--- lib/src/server_messages.dart | 47 +-- lib/src/substituter.dart | 10 +- lib/src/transaction_proxy.dart | 126 -------- lib/src/utf8_backed_string.dart | 2 +- test/connection_test.dart | 1 + test/encoding_test.dart | 2 +- test/interpolation_test.dart | 2 +- test/query_reuse_test.dart | 3 + test/query_test.dart | 3 + test/transaction_test.dart | 5 +- 19 files changed, 690 insertions(+), 673 deletions(-) delete mode 100644 lib/src/transaction_proxy.dart diff --git a/lib/postgres.dart b/lib/postgres.dart index 5dbe806..f8b3777 100644 --- a/lib/postgres.dart +++ b/lib/postgres.dart @@ -1,21 +1,6 @@ library postgres; -import 'dart:convert'; -import 'dart:io'; -import 'dart:async'; -import 'dart:typed_data'; - -import 'package:crypto/crypto.dart'; - -part 'src/transaction_proxy.dart'; -part 'src/client_messages.dart'; -part 'src/server_messages.dart'; -part 'src/postgresql_codec.dart'; -part 'src/substituter.dart'; -part 'src/connection.dart'; -part 'src/message_window.dart'; -part 'src/connection_fsm.dart'; -part 'src/query.dart'; -part 'src/exceptions.dart'; -part 'src/constants.dart'; -part 'src/utf8_backed_string.dart'; +export 'src/connection.dart'; +export 'src/postgresql_codec.dart'; +export 'src/exceptions.dart'; +export 'src/substituter.dart'; \ No newline at end of file diff --git a/lib/src/client_messages.dart b/lib/src/client_messages.dart index 77e23d1..269cd3e 100644 --- a/lib/src/client_messages.dart +++ b/lib/src/client_messages.dart @@ -1,6 +1,11 @@ -part of postgres; +import 'dart:typed_data'; +import 'package:crypto/crypto.dart'; +import 'package:postgres/src/constants.dart'; +import 'package:postgres/src/query.dart'; +import 'package:postgres/src/utf8_backed_string.dart'; -abstract class _ClientMessage { + +abstract class ClientMessage { static const int FormatText = 0; static const int FormatBinary = 1; @@ -44,9 +49,9 @@ abstract class _ClientMessage { return buffer.buffer.asUint8List(); } - static Uint8List aggregateBytes(List<_ClientMessage> messages) { + static Uint8List aggregateBytes(List messages) { var totalLength = - messages.fold(0, (total, _ClientMessage next) => total + next.length); + messages.fold(0, (total, ClientMessage next) => total + next.length); var buffer = new ByteData(totalLength); var offset = 0; @@ -57,8 +62,8 @@ abstract class _ClientMessage { } } -class _StartupMessage extends _ClientMessage { - _StartupMessage(String databaseName, String timeZone, {String username}) { +class StartupMessage extends ClientMessage { + StartupMessage(String databaseName, String timeZone, {String username}) { this.databaseName = new UTF8BackedString(databaseName); this.timeZone = new UTF8BackedString(timeZone); if (username != null) { @@ -85,7 +90,7 @@ class _StartupMessage extends _ClientMessage { int applyToBuffer(ByteData buffer, int offset) { buffer.setInt32(offset, length); offset += 4; - buffer.setInt32(offset, _ClientMessage.ProtocolVersion); + buffer.setInt32(offset, ClientMessage.ProtocolVersion); offset += 4; if (username != null) { @@ -110,8 +115,8 @@ class _StartupMessage extends _ClientMessage { } } -class _AuthMD5Message extends _ClientMessage { - _AuthMD5Message(String username, String password, List saltBytes) { +class AuthMD5Message extends ClientMessage { + AuthMD5Message(String username, String password, List saltBytes) { var passwordHash = md5.convert("${password}${username}".codeUnits).toString(); var saltString = new String.fromCharCodes(saltBytes); @@ -126,7 +131,7 @@ class _AuthMD5Message extends _ClientMessage { } int applyToBuffer(ByteData buffer, int offset) { - buffer.setUint8(offset, _ClientMessage.PasswordIdentifier); + buffer.setUint8(offset, ClientMessage.PasswordIdentifier); offset += 1; buffer.setUint32(offset, length - 1); offset += 4; @@ -136,8 +141,8 @@ class _AuthMD5Message extends _ClientMessage { } } -class _QueryMessage extends _ClientMessage { - _QueryMessage(String queryString) { +class QueryMessage extends ClientMessage { + QueryMessage(String queryString) { this.queryString = new UTF8BackedString(queryString); } @@ -148,7 +153,7 @@ class _QueryMessage extends _ClientMessage { } int applyToBuffer(ByteData buffer, int offset) { - buffer.setUint8(offset, _ClientMessage.QueryIdentifier); + buffer.setUint8(offset, ClientMessage.QueryIdentifier); offset += 1; buffer.setUint32(offset, length - 1); offset += 4; @@ -158,8 +163,8 @@ class _QueryMessage extends _ClientMessage { } } -class _ParseMessage extends _ClientMessage { - _ParseMessage(String statement, {String statementName: ""}) { +class ParseMessage extends ClientMessage { + ParseMessage(String statement, {String statementName: ""}) { this.statement = new UTF8BackedString(statement); this.statementName = new UTF8BackedString(statementName); } @@ -172,7 +177,7 @@ class _ParseMessage extends _ClientMessage { } int applyToBuffer(ByteData buffer, int offset) { - buffer.setUint8(offset, _ClientMessage.ParseIdentifier); + buffer.setUint8(offset, ClientMessage.ParseIdentifier); offset += 1; buffer.setUint32(offset, length - 1); offset += 4; @@ -187,8 +192,8 @@ class _ParseMessage extends _ClientMessage { } } -class _DescribeMessage extends _ClientMessage { - _DescribeMessage({String statementName: ""}) { +class DescribeMessage extends ClientMessage { + DescribeMessage({String statementName: ""}) { this.statementName = new UTF8BackedString(statementName); } @@ -199,7 +204,7 @@ class _DescribeMessage extends _ClientMessage { } int applyToBuffer(ByteData buffer, int offset) { - buffer.setUint8(offset, _ClientMessage.DescribeIdentifier); + buffer.setUint8(offset, ClientMessage.DescribeIdentifier); offset += 1; buffer.setUint32(offset, length - 1); offset += 4; @@ -212,13 +217,13 @@ class _DescribeMessage extends _ClientMessage { } } -class _BindMessage extends _ClientMessage { - _BindMessage(this.parameters, {String statementName: ""}) { +class BindMessage extends ClientMessage { + BindMessage(this.parameters, {String statementName: ""}) { typeSpecCount = parameters.where((p) => p.isBinary).length; this.statementName = new UTF8BackedString(statementName); } - List<_ParameterValue> parameters; + List parameters; UTF8BackedString statementName; int typeSpecCount; @@ -234,7 +239,7 @@ class _BindMessage extends _ClientMessage { _cachedLength = 15; _cachedLength += statementName.utf8Length; _cachedLength += inputParameterElementCount * 2; - _cachedLength += parameters.fold(0, (len, _ParameterValue paramValue) { + _cachedLength += parameters.fold(0, (len, ParameterValue paramValue) { if (paramValue.bytes == null) { return len + 4; } else { @@ -246,7 +251,7 @@ class _BindMessage extends _ClientMessage { } int applyToBuffer(ByteData buffer, int offset) { - buffer.setUint8(offset, _ClientMessage.BindIdentifier); + buffer.setUint8(offset, ClientMessage.BindIdentifier); offset += 1; buffer.setUint32(offset, length - 1); offset += 4; @@ -262,13 +267,13 @@ class _BindMessage extends _ClientMessage { buffer.setUint16(offset, 1); // Apply following format code for all parameters by indicating 1 offset += 2; - buffer.setUint16(offset, _ClientMessage.FormatBinary); + buffer.setUint16(offset, ClientMessage.FormatBinary); offset += 2; // Specify format code for all params is BINARY } else if (typeSpecCount == 0) { buffer.setUint16(offset, 1); // Apply following format code for all parameters by indicating 1 offset += 2; - buffer.setUint16(offset, _ClientMessage.FormatText); + buffer.setUint16(offset, ClientMessage.FormatText); offset += 2; // Specify format code for all params is TEXT } else { // Well, we have some text and some binary, so we have to be explicit about each one @@ -278,8 +283,8 @@ class _BindMessage extends _ClientMessage { buffer.setUint16( offset, p.isBinary - ? _ClientMessage.FormatBinary - : _ClientMessage.FormatText); + ? ClientMessage.FormatBinary + : ClientMessage.FormatText); offset += 2; }); } @@ -311,15 +316,15 @@ class _BindMessage extends _ClientMessage { } } -class _ExecuteMessage extends _ClientMessage { - _ExecuteMessage(); +class ExecuteMessage extends ClientMessage { + ExecuteMessage(); int get length { return 10; } int applyToBuffer(ByteData buffer, int offset) { - buffer.setUint8(offset, _ClientMessage.ExecuteIdentifier); + buffer.setUint8(offset, ClientMessage.ExecuteIdentifier); offset += 1; buffer.setUint32(offset, length - 1); offset += 4; @@ -331,15 +336,15 @@ class _ExecuteMessage extends _ClientMessage { } } -class _SyncMessage extends _ClientMessage { - _SyncMessage(); +class SyncMessage extends ClientMessage { + SyncMessage(); int get length { return 5; } int applyToBuffer(ByteData buffer, int offset) { - buffer.setUint8(offset, _ClientMessage.SyncIdentifier); + buffer.setUint8(offset, ClientMessage.SyncIdentifier); offset += 1; buffer.setUint32(offset, 4); offset += 4; diff --git a/lib/src/connection.dart b/lib/src/connection.dart index 0494849..1b03cb9 100644 --- a/lib/src/connection.dart +++ b/lib/src/connection.dart @@ -1,4 +1,11 @@ -part of postgres; +import 'dart:async'; + +import 'dart:io'; +import 'package:postgres/src/client_messages.dart'; +import 'package:postgres/src/exceptions.dart'; +import 'package:postgres/src/message_window.dart'; +import 'package:postgres/src/query.dart'; +import 'package:postgres/src/server_messages.dart'; abstract class PostgreSQLExecutionContext { /// Executes a query on this context. @@ -61,7 +68,7 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { this.timeoutInSeconds: 30, this.timeZone: "UTC", this.useSSL: false}) { - _connectionState = new _PostgreSQLConnectionStateClosed(); + _connectionState = new PostgreSQLConnectionStateClosed(); _connectionState.connection = this; } @@ -96,7 +103,7 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { /// This is [true] when this instance is first created and after it has been closed or encountered an unrecoverable error. /// If a connection has already been opened and this value is now true, the connection cannot be reopened and a new instance /// must be created. - bool get isClosed => _connectionState is _PostgreSQLConnectionStateClosed; + bool get isClosed => _connectionState is PostgreSQLConnectionStateClosed; /// Settings values from the connected database. /// @@ -105,9 +112,9 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { Map settings = {}; Socket _socket; - _MessageFramer _framer = new _MessageFramer(); + MessageFramer _framer = new MessageFramer(); - Map _reuseMap = {}; + Map _reuseMap = {}; int _reuseCounter = 0; int _processID; @@ -115,11 +122,11 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { List _salt; bool _hasConnectedPreviously = false; - _PostgreSQLConnectionState _connectionState; + PostgreSQLConnectionState _connectionState; - List<_Query> _queryQueue = []; + List _queryQueue = []; - _Query get _pendingQuery { + Query get _pendingQuery { if (_queryQueue.isEmpty) { return null; } @@ -151,17 +158,17 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { .timeout(new Duration(seconds: timeoutInSeconds)); } - _framer = new _MessageFramer(); + _framer = new MessageFramer(); _socket.listen(_readData, onError: _handleSocketError, onDone: _handleSocketClosed); var connectionComplete = new Completer(); _transitionToState( - new _PostgreSQLConnectionStateSocketConnected(connectionComplete)); + new PostgreSQLConnectionStateSocketConnected(connectionComplete)); return connectionComplete.future .timeout(new Duration(seconds: timeoutInSeconds), onTimeout: () { - _connectionState = new _PostgreSQLConnectionStateClosed(); + _connectionState = new PostgreSQLConnectionStateClosed(); _socket?.destroy(); _cancelCurrentQueries(); @@ -174,7 +181,7 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { /// /// After the returned [Future] completes, this connection can no longer be used to execute queries. Any queries in progress or queued are cancelled. Future close() async { - _connectionState = new _PostgreSQLConnectionStateClosed(); + _connectionState = new PostgreSQLConnectionStateClosed(); await _socket?.close(); @@ -210,7 +217,7 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { "Attempting to execute query, but connection is not open."); } - var query = new _Query>>( + var query = new Query>>( fmtString, substitutionValues, this, null); if (allowReuse) { query.statementIdentifier = _reuseIdentifierForQuery(query); @@ -233,7 +240,7 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { "Attempting to execute query, but connection is not open."); } - var query = new _Query(fmtString, substitutionValues, this, null) + var query = new Query(fmtString, substitutionValues, this, null) ..onlyReturnAffectedRowCount = true; return await _enqueue(query); @@ -273,7 +280,7 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { "Attempting to execute query, but connection is not open."); } - var proxy = new _TransactionProxy(this, queryBlock); + var proxy = new TransactionProxy(this, queryBlock); await _enqueue(proxy.beginQuery); @@ -286,7 +293,7 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { //////// - Future _enqueue(_Query query) async { + Future _enqueue(Query query) async { _queryQueue.add(query); _transitionToState(_connectionState.awake()); @@ -321,7 +328,7 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { }); } - void _transitionToState(_PostgreSQLConnectionState newState) { + void _transitionToState(PostgreSQLConnectionState newState) { if (identical(newState, _connectionState)) { return; } @@ -347,31 +354,31 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { var msg = _framer.popMessage().message; try { - if (msg is _ErrorResponseMessage) { + if (msg is ErrorResponseMessage) { _transitionToState(_connectionState.onErrorResponse(msg)); } else { _transitionToState(_connectionState.onMessage(msg)); } - } catch (e, st) { + } catch (e) { _handleSocketError(e); } } } void _handleSocketError(dynamic error) { - _connectionState = new _PostgreSQLConnectionStateClosed(); + _connectionState = new PostgreSQLConnectionStateClosed(); _socket.destroy(); _cancelCurrentQueries(); } void _handleSocketClosed() { - _connectionState = new _PostgreSQLConnectionStateClosed(); + _connectionState = new PostgreSQLConnectionStateClosed(); _cancelCurrentQueries(); } - void _cacheQuery(_Query query) { + void _cacheQuery(Query query) { if (query.cache == null) { return; } @@ -381,7 +388,7 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { } } - _QueryCache _cachedQuery(String statementIdentifier) { + QueryCache _cachedQuery(String statementIdentifier) { if (statementIdentifier == null) { return null; } @@ -389,7 +396,7 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { return _reuseMap[statementIdentifier]; } - String _reuseIdentifierForQuery(_Query q) { + String _reuseIdentifierForQuery(Query q) { var existing = _reuseMap[q.statement]; if (existing != null) { return existing.preparedStatementName; @@ -403,8 +410,483 @@ class PostgreSQLConnection implements PostgreSQLExecutionContext { } } -class _TransactionRollbackException implements Exception { - _TransactionRollbackException(this.reason); +class TransactionRollbackException implements Exception { + TransactionRollbackException(this.reason); String reason; } + +typedef Future TransactionQuerySignature( + PostgreSQLExecutionContext connection); + +class TransactionProxy implements PostgreSQLExecutionContext { + TransactionProxy(this.connection, this.executionBlock) { + beginQuery = new Query("BEGIN", {}, connection, this) + ..onlyReturnAffectedRowCount = true; + + beginQuery.onComplete.future + .then(startTransaction) + .catchError(handleTransactionQueryError); + } + + Query beginQuery; + Completer completer = new Completer(); + + Future get future => completer.future; + + Query get pendingQuery { + if (queryQueue.length > 0) { + return queryQueue.first; + } + + return null; + } + + List queryQueue = []; + PostgreSQLConnection connection; + TransactionQuerySignature executionBlock; + + Future commit() async { + await execute("COMMIT"); + } + + Future>> query(String fmtString, + {Map substitutionValues: null, + bool allowReuse: true}) async { + if (connection.isClosed) { + throw new PostgreSQLException( + "Attempting to execute query, but connection is not open."); + } + + var query = new Query>>( + fmtString, substitutionValues, connection, this); + + if (allowReuse) { + query.statementIdentifier = connection._reuseIdentifierForQuery(query); + } + + return await enqueue(query); + } + + Future execute(String fmtString, + {Map substitutionValues: null}) async { + if (connection.isClosed) { + throw new PostgreSQLException( + "Attempting to execute query, but connection is not open."); + } + + var query = new Query(fmtString, substitutionValues, connection, this) + ..onlyReturnAffectedRowCount = true; + + return enqueue(query); + } + + void cancelTransaction({String reason: null}) { + throw new TransactionRollbackException(reason); + } + + Future startTransaction(dynamic beginResults) async { + var result; + try { + result = await executionBlock(this); + } on TransactionRollbackException catch (rollback) { + queryQueue = []; + await execute("ROLLBACK"); + completer.complete(new PostgreSQLRollback._(rollback.reason)); + return; + } catch (e) { + queryQueue = []; + + await execute("ROLLBACK"); + completer.completeError(e); + return; + } + + await execute("COMMIT"); + + completer.complete(result); + } + + Future handleTransactionQueryError(dynamic err) async {} + + Future enqueue(Query query) async { + queryQueue.add(query); + connection._transitionToState(connection._connectionState.awake()); + + var result = null; + try { + result = await query.future; + + connection._cacheQuery(query); + queryQueue.remove(query); + } catch (e) { + connection._cacheQuery(query); + queryQueue.remove(query); + rethrow; + } + + return result; + } +} + +/// Represents a rollback from a transaction. +/// +/// If a transaction is cancelled using [PostgreSQLExecutionContext.cancelTransaction], the value of the [Future] +/// returned from [PostgreSQLConnection.transaction] will be an instance of this type. [reason] will be the [String] +/// value of the optional argument to [PostgreSQLExecutionContext.cancelTransaction]. +class PostgreSQLRollback { + PostgreSQLRollback._(this.reason); + + /// The reason the transaction was cancelled. + String reason; +} + +abstract class PostgreSQLConnectionState { + PostgreSQLConnection connection; + + PostgreSQLConnectionState onEnter() { + return this; + } + + PostgreSQLConnectionState awake() { + return this; + } + + PostgreSQLConnectionState onMessage(ServerMessage message) { + return this; + } + + PostgreSQLConnectionState onErrorResponse(ErrorResponseMessage message) { + var exception = new PostgreSQLException.fromFields(message.fields); + + if (exception.severity == PostgreSQLSeverity.fatal || + exception.severity == PostgreSQLSeverity.panic) { + return new PostgreSQLConnectionStateClosed(); + } + + return this; + } + + void onExit() {} +} + +/* + Closed State; starts here and ends here. + */ + +class PostgreSQLConnectionStateClosed extends PostgreSQLConnectionState {} + +/* + Socket connected, prior to any PostgreSQL handshaking - initiates that handshaking + */ + +class PostgreSQLConnectionStateSocketConnected + extends PostgreSQLConnectionState { + PostgreSQLConnectionStateSocketConnected(this.completer); + + Completer completer; + + PostgreSQLConnectionState onEnter() { + var startupMessage = new StartupMessage( + connection.databaseName, connection.timeZone, + username: connection.username); + + connection._socket.add(startupMessage.asBytes()); + + return this; + } + + PostgreSQLConnectionState onErrorResponse(ErrorResponseMessage message) { + var exception = new PostgreSQLException.fromFields(message.fields); + + completer.completeError(exception); + + return new PostgreSQLConnectionStateClosed(); + } + + PostgreSQLConnectionState onMessage(ServerMessage message) { + AuthenticationMessage authMessage = message; + + // Pass on the pending op to subsequent stages + if (authMessage.type == AuthenticationMessage.KindOK) { + return new PostgreSQLConnectionStateAuthenticated(completer); + } else if (authMessage.type == AuthenticationMessage.KindMD5Password) { + connection._salt = authMessage.salt; + + return new PostgreSQLConnectionStateAuthenticating(completer); + } + + completer.completeError( + new PostgreSQLException("Unsupported authentication type ${authMessage + .type}, closing connection.")); + + return new PostgreSQLConnectionStateClosed(); + } +} + +/* + Authenticating state + */ + +class PostgreSQLConnectionStateAuthenticating + extends PostgreSQLConnectionState { + PostgreSQLConnectionStateAuthenticating(this.completer); + + Completer completer; + + PostgreSQLConnectionState onEnter() { + var authMessage = new AuthMD5Message( + connection.username, connection.password, connection._salt); + + connection._socket.add(authMessage.asBytes()); + + return this; + } + + PostgreSQLConnectionState onErrorResponse(ErrorResponseMessage message) { + var exception = new PostgreSQLException.fromFields(message.fields); + + completer.completeError(exception); + + return new PostgreSQLConnectionStateClosed(); + } + + PostgreSQLConnectionState onMessage(ServerMessage message) { + if (message is ParameterStatusMessage) { + connection.settings[message.name] = message.value; + } else if (message is BackendKeyMessage) { + connection._secretKey = message.secretKey; + connection._processID = message.processID; + } else if (message is ReadyForQueryMessage) { + if (message.state == ReadyForQueryMessage.StateIdle) { + return new PostgreSQLConnectionStateIdle(openCompleter: completer); + } + } + + return this; + } +} + +/* + Authenticated state + */ + +class PostgreSQLConnectionStateAuthenticated + extends PostgreSQLConnectionState { + PostgreSQLConnectionStateAuthenticated(this.completer); + + Completer completer; + + PostgreSQLConnectionState onErrorResponse(ErrorResponseMessage message) { + var exception = new PostgreSQLException.fromFields(message.fields); + + completer.completeError(exception); + + return new PostgreSQLConnectionStateClosed(); + } + + PostgreSQLConnectionState onMessage(ServerMessage message) { + if (message is ParameterStatusMessage) { + connection.settings[message.name] = message.value; + } else if (message is BackendKeyMessage) { + connection._secretKey = message.secretKey; + connection._processID = message.processID; + } else if (message is ReadyForQueryMessage) { + if (message.state == ReadyForQueryMessage.StateIdle) { + return new PostgreSQLConnectionStateIdle(openCompleter: completer); + } + } + + return this; + } +} + +/* + Ready/idle state + */ + +class PostgreSQLConnectionStateIdle extends PostgreSQLConnectionState { + PostgreSQLConnectionStateIdle({this.openCompleter}); + + Completer openCompleter; + + PostgreSQLConnectionState awake() { + var pendingQuery = connection._pendingQuery; + if (pendingQuery != null) { + return processQuery(pendingQuery); + } + + return this; + } + + PostgreSQLConnectionState processQuery(Query q) { + try { + if (q.onlyReturnAffectedRowCount) { + q.sendSimple(connection._socket); + return new PostgreSQLConnectionStateBusy(q); + } + + var cached = connection._cachedQuery(q.statement); + q.sendExtended(connection._socket, cacheQuery: cached); + + return new PostgreSQLConnectionStateBusy(q); + } catch (e) { + scheduleMicrotask(() { + q.completeError(e); + connection._transitionToState(new PostgreSQLConnectionStateIdle()); + }); + + return new PostgreSQLConnectionStateDeferredFailure(); + } + } + + PostgreSQLConnectionState onEnter() { + openCompleter?.complete(); + + return awake(); + } + + PostgreSQLConnectionState onMessage(ServerMessage message) { + return this; + } +} + +/* + Busy state, query in progress + */ + +class PostgreSQLConnectionStateBusy extends PostgreSQLConnectionState { + PostgreSQLConnectionStateBusy(this.query); + + Query query; + PostgreSQLException returningException = null; + int rowsAffected = 0; + + PostgreSQLConnectionState onErrorResponse(ErrorResponseMessage message) { + // If we get an error here, then we should eat the rest of the messages + // and we are always confirmed to get a ReadyForQueryMessage to finish up. + // We should only report the error once that is done. + var exception = new PostgreSQLException.fromFields(message.fields); + returningException ??= exception; + + if (exception.severity == PostgreSQLSeverity.fatal || + exception.severity == PostgreSQLSeverity.panic) { + return new PostgreSQLConnectionStateClosed(); + } + + return this; + } + + PostgreSQLConnectionState onMessage(ServerMessage message) { + // We ignore NoData, as it doesn't tell us anything we don't already know + // or care about. + + //print("(${query.statement}) -> $message"); + + if (message is ReadyForQueryMessage) { + if (message.state == ReadyForQueryMessage.StateIdle) { + if (returningException != null) { + query.completeError(returningException); + } else { + query.complete(rowsAffected); + } + + return new PostgreSQLConnectionStateIdle(); + } else if (message.state == ReadyForQueryMessage.StateTransaction) { + if (returningException != null) { + query.completeError(returningException); + } else { + query.complete(rowsAffected); + } + + return new PostgreSQLConnectionStateReadyInTransaction( + query.transaction); + } else if (message.state == ReadyForQueryMessage.StateTransactionError) { + // This should cancel the transaction, we may have to send a commit here + query.completeError(returningException); + return new PostgreSQLConnectionStateTransactionFailure( + query.transaction); + } + } else if (message is CommandCompleteMessage) { + rowsAffected = message.rowsAffected; + } else if (message is RowDescriptionMessage) { + query.fieldDescriptions = message.fieldDescriptions; + } else if (message is DataRowMessage) { + query.addRow(message.values); + } else if (message is ParameterDescriptionMessage) { + var validationException = + query.validateParameters(message.parameterTypeIDs); + if (validationException != null) { + query.cache = null; + } + returningException ??= validationException; + } + + return this; + } +} + +/* Idle Transaction State */ + +class PostgreSQLConnectionStateReadyInTransaction + extends PostgreSQLConnectionState { + PostgreSQLConnectionStateReadyInTransaction(this.transaction); + + TransactionProxy transaction; + + PostgreSQLConnectionState onEnter() { + return awake(); + } + + PostgreSQLConnectionState awake() { + var pendingQuery = transaction.pendingQuery; + if (pendingQuery != null) { + return processQuery(pendingQuery); + } + + return this; + } + + PostgreSQLConnectionState processQuery(Query q) { + try { + if (q.onlyReturnAffectedRowCount) { + q.sendSimple(connection._socket); + return new PostgreSQLConnectionStateBusy(q); + } + + var cached = connection._cachedQuery(q.statement); + q.sendExtended(connection._socket, cacheQuery: cached); + + return new PostgreSQLConnectionStateBusy(q); + } catch (e) { + scheduleMicrotask(() { + q.completeError(e); + connection._transitionToState(new PostgreSQLConnectionStateIdle()); + }); + + return new PostgreSQLConnectionStateDeferredFailure(); + } + } +} + +/* + Transaction error state + */ + +class PostgreSQLConnectionStateTransactionFailure + extends PostgreSQLConnectionState { + PostgreSQLConnectionStateTransactionFailure(this.transaction); + + TransactionProxy transaction; + + PostgreSQLConnectionState awake() { + return new PostgreSQLConnectionStateReadyInTransaction(transaction); + } +} + +/* + Hack for deferred error + */ + +class PostgreSQLConnectionStateDeferredFailure + extends PostgreSQLConnectionState {} diff --git a/lib/src/connection_fsm.dart b/lib/src/connection_fsm.dart index 857c6e4..e69de29 100644 --- a/lib/src/connection_fsm.dart +++ b/lib/src/connection_fsm.dart @@ -1,351 +0,0 @@ -part of postgres; - -abstract class _PostgreSQLConnectionState { - PostgreSQLConnection connection; - - _PostgreSQLConnectionState onEnter() { - return this; - } - - _PostgreSQLConnectionState awake() { - return this; - } - - _PostgreSQLConnectionState onMessage(_ServerMessage message) { - return this; - } - - _PostgreSQLConnectionState onErrorResponse(_ErrorResponseMessage message) { - var exception = new PostgreSQLException._(message.fields); - - if (exception.severity == PostgreSQLSeverity.fatal || - exception.severity == PostgreSQLSeverity.panic) { - return new _PostgreSQLConnectionStateClosed(); - } - - return this; - } - - void onExit() {} -} - -/* - Closed State; starts here and ends here. - */ - -class _PostgreSQLConnectionStateClosed extends _PostgreSQLConnectionState {} - -/* - Socket connected, prior to any PostgreSQL handshaking - initiates that handshaking - */ - -class _PostgreSQLConnectionStateSocketConnected - extends _PostgreSQLConnectionState { - _PostgreSQLConnectionStateSocketConnected(this.completer); - - Completer completer; - - _PostgreSQLConnectionState onEnter() { - var startupMessage = new _StartupMessage( - connection.databaseName, connection.timeZone, - username: connection.username); - - connection._socket.add(startupMessage.asBytes()); - - return this; - } - - _PostgreSQLConnectionState onErrorResponse(_ErrorResponseMessage message) { - var exception = new PostgreSQLException._(message.fields); - - completer.completeError(exception); - - return new _PostgreSQLConnectionStateClosed(); - } - - _PostgreSQLConnectionState onMessage(_ServerMessage message) { - _AuthenticationMessage authMessage = message; - - // Pass on the pending op to subsequent stages - if (authMessage.type == _AuthenticationMessage.KindOK) { - return new _PostgreSQLConnectionStateAuthenticated(completer); - } else if (authMessage.type == _AuthenticationMessage.KindMD5Password) { - connection._salt = authMessage.salt; - - return new _PostgreSQLConnectionStateAuthenticating(completer); - } - - completer.completeError( - new PostgreSQLException("Unsupported authentication type ${authMessage - .type}, closing connection.")); - - return new _PostgreSQLConnectionStateClosed(); - } -} - -/* - Authenticating state - */ - -class _PostgreSQLConnectionStateAuthenticating - extends _PostgreSQLConnectionState { - _PostgreSQLConnectionStateAuthenticating(this.completer); - - Completer completer; - - _PostgreSQLConnectionState onEnter() { - var authMessage = new _AuthMD5Message( - connection.username, connection.password, connection._salt); - - connection._socket.add(authMessage.asBytes()); - - return this; - } - - _PostgreSQLConnectionState onErrorResponse(_ErrorResponseMessage message) { - var exception = new PostgreSQLException._(message.fields); - - completer.completeError(exception); - - return new _PostgreSQLConnectionStateClosed(); - } - - _PostgreSQLConnectionState onMessage(_ServerMessage message) { - if (message is _ParameterStatusMessage) { - connection.settings[message.name] = message.value; - } else if (message is _BackendKeyMessage) { - connection._secretKey = message.secretKey; - connection._processID = message.processID; - } else if (message is _ReadyForQueryMessage) { - if (message.state == _ReadyForQueryMessage.StateIdle) { - return new _PostgreSQLConnectionStateIdle(openCompleter: completer); - } - } - - return this; - } -} - -/* - Authenticated state - */ - -class _PostgreSQLConnectionStateAuthenticated - extends _PostgreSQLConnectionState { - _PostgreSQLConnectionStateAuthenticated(this.completer); - - Completer completer; - - _PostgreSQLConnectionState onErrorResponse(_ErrorResponseMessage message) { - var exception = new PostgreSQLException._(message.fields); - - completer.completeError(exception); - - return new _PostgreSQLConnectionStateClosed(); - } - - _PostgreSQLConnectionState onMessage(_ServerMessage message) { - if (message is _ParameterStatusMessage) { - connection.settings[message.name] = message.value; - } else if (message is _BackendKeyMessage) { - connection._secretKey = message.secretKey; - connection._processID = message.processID; - } else if (message is _ReadyForQueryMessage) { - if (message.state == _ReadyForQueryMessage.StateIdle) { - return new _PostgreSQLConnectionStateIdle(openCompleter: completer); - } - } - - return this; - } -} - -/* - Ready/idle state - */ - -class _PostgreSQLConnectionStateIdle extends _PostgreSQLConnectionState { - _PostgreSQLConnectionStateIdle({this.openCompleter}); - - Completer openCompleter; - - _PostgreSQLConnectionState awake() { - var pendingQuery = connection._pendingQuery; - if (pendingQuery != null) { - return processQuery(pendingQuery); - } - - return this; - } - - _PostgreSQLConnectionState processQuery(_Query q) { - try { - if (q.onlyReturnAffectedRowCount) { - q.sendSimple(connection._socket); - return new _PostgreSQLConnectionStateBusy(q); - } - - var cached = connection._cachedQuery(q.statement); - q.sendExtended(connection._socket, cacheQuery: cached); - - return new _PostgreSQLConnectionStateBusy(q); - } catch (e) { - scheduleMicrotask(() { - q.completeError(e); - connection._transitionToState(new _PostgreSQLConnectionStateIdle()); - }); - - return new _PostgreSQLConnectionStateDeferredFailure(); - } - } - - _PostgreSQLConnectionState onEnter() { - openCompleter?.complete(); - - return awake(); - } - - _PostgreSQLConnectionState onMessage(_ServerMessage message) { - return this; - } -} - -/* - Busy state, query in progress - */ - -class _PostgreSQLConnectionStateBusy extends _PostgreSQLConnectionState { - _PostgreSQLConnectionStateBusy(this.query); - - _Query query; - PostgreSQLException returningException = null; - int rowsAffected = 0; - - _PostgreSQLConnectionState onErrorResponse(_ErrorResponseMessage message) { - // If we get an error here, then we should eat the rest of the messages - // and we are always confirmed to get a _ReadyForQueryMessage to finish up. - // We should only report the error once that is done. - var exception = new PostgreSQLException._(message.fields); - returningException ??= exception; - - if (exception.severity == PostgreSQLSeverity.fatal || - exception.severity == PostgreSQLSeverity.panic) { - return new _PostgreSQLConnectionStateClosed(); - } - - return this; - } - - _PostgreSQLConnectionState onMessage(_ServerMessage message) { - // We ignore NoData, as it doesn't tell us anything we don't already know - // or care about. - - //print("(${query.statement}) -> $message"); - - if (message is _ReadyForQueryMessage) { - if (message.state == _ReadyForQueryMessage.StateIdle) { - if (returningException != null) { - query.completeError(returningException); - } else { - query.complete(rowsAffected); - } - - return new _PostgreSQLConnectionStateIdle(); - } else if (message.state == _ReadyForQueryMessage.StateTransaction) { - if (returningException != null) { - query.completeError(returningException); - } else { - query.complete(rowsAffected); - } - - return new _PostgreSQLConnectionStateReadyInTransaction( - query.transaction); - } else if (message.state == _ReadyForQueryMessage.StateTransactionError) { - // This should cancel the transaction, we may have to send a commit here - query.completeError(returningException); - return new _PostgreSQLConnectionStateTransactionFailure( - query.transaction); - } - } else if (message is _CommandCompleteMessage) { - rowsAffected = message.rowsAffected; - } else if (message is _RowDescriptionMessage) { - query.fieldDescriptions = message.fieldDescriptions; - } else if (message is _DataRowMessage) { - query.addRow(message.values); - } else if (message is _ParameterDescriptionMessage) { - var validationException = - query.validateParameters(message.parameterTypeIDs); - if (validationException != null) { - query.cache = null; - } - returningException ??= validationException; - } - - return this; - } -} - -/* Idle Transaction State */ - -class _PostgreSQLConnectionStateReadyInTransaction - extends _PostgreSQLConnectionState { - _PostgreSQLConnectionStateReadyInTransaction(this.transaction); - - _TransactionProxy transaction; - - _PostgreSQLConnectionState onEnter() { - return awake(); - } - - _PostgreSQLConnectionState awake() { - var pendingQuery = transaction.pendingQuery; - if (pendingQuery != null) { - return processQuery(pendingQuery); - } - - return this; - } - - _PostgreSQLConnectionState processQuery(_Query q) { - try { - if (q.onlyReturnAffectedRowCount) { - q.sendSimple(connection._socket); - return new _PostgreSQLConnectionStateBusy(q); - } - - var cached = connection._cachedQuery(q.statement); - q.sendExtended(connection._socket, cacheQuery: cached); - - return new _PostgreSQLConnectionStateBusy(q); - } catch (e) { - scheduleMicrotask(() { - q.completeError(e); - connection._transitionToState(new _PostgreSQLConnectionStateIdle()); - }); - - return new _PostgreSQLConnectionStateDeferredFailure(); - } - } -} - -/* - Transaction error state - */ - -class _PostgreSQLConnectionStateTransactionFailure - extends _PostgreSQLConnectionState { - _PostgreSQLConnectionStateTransactionFailure(this.transaction); - - _TransactionProxy transaction; - - _PostgreSQLConnectionState awake() { - return new _PostgreSQLConnectionStateReadyInTransaction(transaction); - } -} - -/* - Hack for deferred error - */ - -class _PostgreSQLConnectionStateDeferredFailure - extends _PostgreSQLConnectionState {} diff --git a/lib/src/constants.dart b/lib/src/constants.dart index bb3533c..1364c63 100644 --- a/lib/src/constants.dart +++ b/lib/src/constants.dart @@ -1,5 +1,3 @@ -part of postgres; - class UTF8ByteConstants { static const user = const [117, 115, 101, 114, 0]; static const database = const [100, 97, 116, 97, 98, 97, 115, 101, 0]; diff --git a/lib/src/exceptions.dart b/lib/src/exceptions.dart index 0e1b5bb..4e27418 100644 --- a/lib/src/exceptions.dart +++ b/lib/src/exceptions.dart @@ -1,4 +1,3 @@ -part of postgres; /// The severity level of a [PostgreSQLException]. /// @@ -42,35 +41,35 @@ class PostgreSQLException implements Exception { code = ""; } - PostgreSQLException._(List<_ErrorField> errorFields, {this.stackTrace}) { + PostgreSQLException.fromFields(List errorFields, {this.stackTrace}) { var finder = (int identifer) => (errorFields.firstWhere( - (_ErrorField e) => e.identificationToken == identifer, + (ErrorField e) => e.identificationToken == identifer, orElse: () => null)); - severity = _ErrorField - .severityFromString(finder(_ErrorField.SeverityIdentifier).text); - code = finder(_ErrorField.CodeIdentifier).text; - message = finder(_ErrorField.MessageIdentifier).text; - detail = finder(_ErrorField.DetailIdentifier)?.text; - hint = finder(_ErrorField.HintIdentifier)?.text; - - internalQuery = finder(_ErrorField.InternalQueryIdentifier)?.text; - trace = finder(_ErrorField.WhereIdentifier)?.text; - schemaName = finder(_ErrorField.SchemaIdentifier)?.text; - tableName = finder(_ErrorField.TableIdentifier)?.text; - columnName = finder(_ErrorField.ColumnIdentifier)?.text; - dataTypeName = finder(_ErrorField.DataTypeIdentifier)?.text; - constraintName = finder(_ErrorField.ConstraintIdentifier)?.text; - fileName = finder(_ErrorField.FileIdentifier)?.text; - routineName = finder(_ErrorField.RoutineIdentifier)?.text; - - var i = finder(_ErrorField.PositionIdentifier)?.text; + severity = ErrorField + .severityFromString(finder(ErrorField.SeverityIdentifier).text); + code = finder(ErrorField.CodeIdentifier).text; + message = finder(ErrorField.MessageIdentifier).text; + detail = finder(ErrorField.DetailIdentifier)?.text; + hint = finder(ErrorField.HintIdentifier)?.text; + + internalQuery = finder(ErrorField.InternalQueryIdentifier)?.text; + trace = finder(ErrorField.WhereIdentifier)?.text; + schemaName = finder(ErrorField.SchemaIdentifier)?.text; + tableName = finder(ErrorField.TableIdentifier)?.text; + columnName = finder(ErrorField.ColumnIdentifier)?.text; + dataTypeName = finder(ErrorField.DataTypeIdentifier)?.text; + constraintName = finder(ErrorField.ConstraintIdentifier)?.text; + fileName = finder(ErrorField.FileIdentifier)?.text; + routineName = finder(ErrorField.RoutineIdentifier)?.text; + + var i = finder(ErrorField.PositionIdentifier)?.text; position = (i != null ? int.parse(i) : null); - i = finder(_ErrorField.InternalPositionIdentifier)?.text; + i = finder(ErrorField.InternalPositionIdentifier)?.text; internalPosition = (i != null ? int.parse(i) : null); - i = finder(_ErrorField.LineIdentifier)?.text; + i = finder(ErrorField.LineIdentifier)?.text; lineNumber = (i != null ? int.parse(i) : null); } @@ -115,7 +114,7 @@ class PostgreSQLException implements Exception { "$severity $code: $message Detail: $detail Hint: $hint Table: $tableName Column: $columnName Constraint: $constraintName"; } -class _ErrorField { +class ErrorField { static const int SeverityIdentifier = 83; static const int CodeIdentifier = 67; static const int MessageIdentifier = 77; diff --git a/lib/src/message_window.dart b/lib/src/message_window.dart index 45691f2..cb9cfc9 100644 --- a/lib/src/message_window.dart +++ b/lib/src/message_window.dart @@ -1,19 +1,22 @@ -part of postgres; +import 'dart:io'; +import 'dart:typed_data'; -class _MessageFrame { +import 'package:postgres/src/server_messages.dart'; + +class MessageFrame { static Map _messageTypeMap = { - 49: () => new _ParseCompleteMessage(), - 50: () => new _BindCompleteMessage(), - 67: () => new _CommandCompleteMessage(), - 68: () => new _DataRowMessage(), - 69: () => new _ErrorResponseMessage(), - 75: () => new _BackendKeyMessage(), - 82: () => new _AuthenticationMessage(), - 83: () => new _ParameterStatusMessage(), - 84: () => new _RowDescriptionMessage(), - 90: () => new _ReadyForQueryMessage(), - 110: () => new _NoDataMessage(), - 116: () => new _ParameterDescriptionMessage() + 49: () => new ParseCompleteMessage(), + 50: () => new BindCompleteMessage(), + 67: () => new CommandCompleteMessage(), + 68: () => new DataRowMessage(), + 69: () => new ErrorResponseMessage(), + 75: () => new BackendKeyMessage(), + 82: () => new AuthenticationMessage(), + 83: () => new ParameterStatusMessage(), + 84: () => new RowDescriptionMessage(), + 90: () => new ReadyForQueryMessage(), + 110: () => new NoDataMessage(), + 116: () => new ParameterDescriptionMessage() }; BytesBuilder _inputBuffer = new BytesBuilder(copy: false); @@ -65,16 +68,16 @@ class _MessageFrame { return bytes.length; } - _ServerMessage get message { + ServerMessage get message { var msgMaker = _messageTypeMap[type]; if (msgMaker == null) { msgMaker = () { - var msg = new _UnknownMessage()..code = type; + var msg = new UnknownMessage()..code = type; return msg; }; } - _ServerMessage msg = msgMaker(); + ServerMessage msg = msgMaker(); msg.readBytes(data); @@ -82,9 +85,9 @@ class _MessageFrame { } } -class _MessageFramer { - _MessageFrame messageInProgress = new _MessageFrame(); - List<_MessageFrame> messageQueue = []; +class MessageFramer { + MessageFrame messageInProgress = new MessageFrame(); + List messageQueue = []; void addBytes(Uint8List bytes) { var offsetIntoBytesRead = 0; @@ -95,14 +98,14 @@ class _MessageFramer { if (messageInProgress.isComplete) { messageQueue.add(messageInProgress); - messageInProgress = new _MessageFrame(); + messageInProgress = new MessageFrame(); } } while (offsetIntoBytesRead != bytes.length); } bool get hasMessage => messageQueue.isNotEmpty; - _MessageFrame popMessage() { + MessageFrame popMessage() { return messageQueue.removeAt(0); } } diff --git a/lib/src/postgresql_codec.dart b/lib/src/postgresql_codec.dart index db0a2a9..e75f2fc 100644 --- a/lib/src/postgresql_codec.dart +++ b/lib/src/postgresql_codec.dart @@ -1,4 +1,7 @@ -part of postgres; +import 'dart:convert'; +import 'dart:typed_data'; + +import 'package:postgres/src/exceptions.dart'; /// The set of available data types that [PostgreSQLConnection]s support. enum PostgreSQLDataType { diff --git a/lib/src/query.dart b/lib/src/query.dart index 500ca3c..fa91aad 100644 --- a/lib/src/query.dart +++ b/lib/src/query.dart @@ -1,7 +1,15 @@ -part of postgres; - -class _Query { - _Query(this.statement, this.substitutionValues, this.connection, +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; +import 'dart:typed_data'; +import 'package:postgres/src/client_messages.dart'; +import 'package:postgres/src/connection.dart'; +import 'package:postgres/src/exceptions.dart'; +import 'package:postgres/src/postgresql_codec.dart'; +import 'package:postgres/src/substituter.dart'; + +class Query { + Query(this.statement, this.substitutionValues, this.connection, this.transaction); bool onlyReturnAffectedRowCount = false; @@ -12,32 +20,32 @@ class _Query { final String statement; final Map substitutionValues; - final _TransactionProxy transaction; + final TransactionProxy transaction; final PostgreSQLConnection connection; List specifiedParameterTypeCodes; - List<_FieldDescription> _fieldDescriptions; + List _fieldDescriptions; - List<_FieldDescription> get fieldDescriptions => _fieldDescriptions; + List get fieldDescriptions => _fieldDescriptions; - void set fieldDescriptions(List<_FieldDescription> fds) { + void set fieldDescriptions(List fds) { _fieldDescriptions = fds; cache?.fieldDescriptions = fds; } List> rows = []; - _QueryCache cache; + QueryCache cache; void sendSimple(Socket socket) { var sqlString = PostgreSQLFormat.substitute(statement, substitutionValues); - var queryMessage = new _QueryMessage(sqlString); + var queryMessage = new QueryMessage(sqlString); socket.add(queryMessage.asBytes()); } - void sendExtended(Socket socket, {_QueryCache cacheQuery: null}) { + void sendExtended(Socket socket, {QueryCache cacheQuery: null}) { if (cacheQuery != null) { fieldDescriptions = cacheQuery.fieldDescriptions; sendCachedQuery(socket, cacheQuery, substitutionValues); @@ -46,9 +54,9 @@ class _Query { } String statementName = (statementIdentifier ?? ""); - var formatIdentifiers = <_PostgreSQLFormatIdentifier>[]; + var formatIdentifiers = []; var sqlString = PostgreSQLFormat.substitute(statement, substitutionValues, - replace: (_PostgreSQLFormatIdentifier identifier, int index) { + replace: (PostgreSQLFormatIdentifier identifier, int index) { formatIdentifiers.add(identifier); return "\$$index"; @@ -62,43 +70,43 @@ class _Query { .toList(); var messages = [ - new _ParseMessage(sqlString, statementName: statementName), - new _DescribeMessage(statementName: statementName), - new _BindMessage(parameterList, statementName: statementName), - new _ExecuteMessage(), - new _SyncMessage() + new ParseMessage(sqlString, statementName: statementName), + new DescribeMessage(statementName: statementName), + new BindMessage(parameterList, statementName: statementName), + new ExecuteMessage(), + new SyncMessage() ]; if (statementIdentifier != null) { - cache = new _QueryCache(statementIdentifier, formatIdentifiers); + cache = new QueryCache(statementIdentifier, formatIdentifiers); } - socket.add(_ClientMessage.aggregateBytes(messages)); + socket.add(ClientMessage.aggregateBytes(messages)); } - void sendCachedQuery(Socket socket, _QueryCache cacheQuery, + void sendCachedQuery(Socket socket, QueryCache cacheQuery, Map substitutionValues) { var statementName = cacheQuery.preparedStatementName; var parameterList = cacheQuery.orderedParameters .map((identifier) => encodeParameter(identifier, substitutionValues)) .toList(); - var bytes = _ClientMessage.aggregateBytes([ - new _BindMessage(parameterList, statementName: statementName), - new _ExecuteMessage(), - new _SyncMessage() + var bytes = ClientMessage.aggregateBytes([ + new BindMessage(parameterList, statementName: statementName), + new ExecuteMessage(), + new SyncMessage() ]); socket.add(bytes); } - _ParameterValue encodeParameter(_PostgreSQLFormatIdentifier identifier, + ParameterValue encodeParameter(PostgreSQLFormatIdentifier identifier, Map substitutionValues) { if (identifier.typeCode != null) { - return new _ParameterValue.binary( + return new ParameterValue.binary( substitutionValues[identifier.name], identifier.typeCode); } else { - return new _ParameterValue.text(substitutionValues[identifier.name]); + return new ParameterValue.text(substitutionValues[identifier.name]); } } @@ -150,12 +158,12 @@ class _Query { String toString() => statement; } -class _QueryCache { - _QueryCache(this.preparedStatementName, this.orderedParameters); +class QueryCache { + QueryCache(this.preparedStatementName, this.orderedParameters); String preparedStatementName; - List<_PostgreSQLFormatIdentifier> orderedParameters; - List<_FieldDescription> fieldDescriptions; + List orderedParameters; + List fieldDescriptions; bool get isValid { return preparedStatementName != null && @@ -164,8 +172,8 @@ class _QueryCache { } } -class _ParameterValue { - _ParameterValue.binary(dynamic value, this.postgresType) { +class ParameterValue { + ParameterValue.binary(dynamic value, this.postgresType) { isBinary = true; bytes = PostgreSQLCodec .encodeBinary(value, this.postgresType) @@ -174,7 +182,7 @@ class _ParameterValue { length = bytes?.length ?? 0; } - _ParameterValue.text(dynamic value) { + ParameterValue.text(dynamic value) { isBinary = false; if (value != null) { bytes = UTF8.encode(PostgreSQLCodec.encode(value, escapeStrings: false)); @@ -188,7 +196,7 @@ class _ParameterValue { int length; } -class _FieldDescription { +class FieldDescription { String fieldName; int tableID; int columnID; diff --git a/lib/src/server_messages.dart b/lib/src/server_messages.dart index c0ae051..5f8293a 100644 --- a/lib/src/server_messages.dart +++ b/lib/src/server_messages.dart @@ -1,12 +1,17 @@ -part of postgres; +import 'dart:convert'; +import 'dart:typed_data'; -abstract class _ServerMessage { +import 'package:postgres/src/exceptions.dart'; +import 'package:postgres/src/query.dart'; + + +abstract class ServerMessage { void readBytes(Uint8List bytes); } -class _ErrorResponseMessage implements _ServerMessage { +class ErrorResponseMessage implements ServerMessage { PostgreSQLException generatedException; - List<_ErrorField> fields = [new _ErrorField()]; + List fields = [new ErrorField()]; void readBytes(Uint8List bytes) { var lastByteRemovedList = @@ -18,16 +23,16 @@ class _ErrorResponseMessage implements _ServerMessage { return; } - fields.add(new _ErrorField()); + fields.add(new ErrorField()); }); - generatedException = new PostgreSQLException._(fields); + generatedException = new PostgreSQLException.fromFields(fields); } String toString() => generatedException.toString(); } -class _AuthenticationMessage implements _ServerMessage { +class AuthenticationMessage implements ServerMessage { static const int KindOK = 0; static const int KindKerberosV5 = 2; static const int KindClearTextPassword = 3; @@ -56,7 +61,7 @@ class _AuthenticationMessage implements _ServerMessage { String toString() => "Authentication: $type"; } -class _ParameterStatusMessage extends _ServerMessage { +class ParameterStatusMessage extends ServerMessage { String name; String value; @@ -69,7 +74,7 @@ class _ParameterStatusMessage extends _ServerMessage { String toString() => "Parameter Message: $name $value"; } -class _ReadyForQueryMessage extends _ServerMessage { +class ReadyForQueryMessage extends ServerMessage { static const String StateIdle = "I"; static const String StateTransaction = "T"; static const String StateTransactionError = "E"; @@ -83,7 +88,7 @@ class _ReadyForQueryMessage extends _ServerMessage { String toString() => "Ready Message: $state"; } -class _BackendKeyMessage extends _ServerMessage { +class BackendKeyMessage extends ServerMessage { int processID; int secretKey; @@ -96,8 +101,8 @@ class _BackendKeyMessage extends _ServerMessage { String toString() => "Backend Key Message: $processID $secretKey"; } -class _RowDescriptionMessage extends _ServerMessage { - List<_FieldDescription> fieldDescriptions; +class RowDescriptionMessage extends ServerMessage { + List fieldDescriptions; void readBytes(Uint8List bytes) { var view = new ByteData.view(bytes.buffer, bytes.offsetInBytes); @@ -105,9 +110,9 @@ class _RowDescriptionMessage extends _ServerMessage { var fieldCount = view.getInt16(offset); offset += 2; - fieldDescriptions = <_FieldDescription>[]; + fieldDescriptions = []; for (var i = 0; i < fieldCount; i++) { - var rowDesc = new _FieldDescription(); + var rowDesc = new FieldDescription(); offset = rowDesc.parse(view, offset); fieldDescriptions.add(rowDesc); } @@ -116,7 +121,7 @@ class _RowDescriptionMessage extends _ServerMessage { String toString() => "RowDescription Message: $fieldDescriptions"; } -class _DataRowMessage extends _ServerMessage { +class DataRowMessage extends ServerMessage { List values = []; void readBytes(Uint8List bytes) { @@ -145,7 +150,7 @@ class _DataRowMessage extends _ServerMessage { String toString() => "Data Row Message: ${values}"; } -class _CommandCompleteMessage extends _ServerMessage { +class CommandCompleteMessage extends ServerMessage { int rowsAffected; static RegExp identifierExpression = new RegExp(r"[A-Z ]*"); @@ -164,19 +169,19 @@ class _CommandCompleteMessage extends _ServerMessage { String toString() => "Command Complete Message: $rowsAffected"; } -class _ParseCompleteMessage extends _ServerMessage { +class ParseCompleteMessage extends ServerMessage { void readBytes(Uint8List bytes) {} String toString() => "Parse Complete Message"; } -class _BindCompleteMessage extends _ServerMessage { +class BindCompleteMessage extends ServerMessage { void readBytes(Uint8List bytes) {} String toString() => "Bind Complete Message"; } -class _ParameterDescriptionMessage extends _ServerMessage { +class ParameterDescriptionMessage extends ServerMessage { List parameterTypeIDs; void readBytes(Uint8List bytes) { @@ -197,13 +202,13 @@ class _ParameterDescriptionMessage extends _ServerMessage { String toString() => "Parameter Description Message: $parameterTypeIDs"; } -class _NoDataMessage extends _ServerMessage { +class NoDataMessage extends ServerMessage { void readBytes(Uint8List bytes) {} String toString() => "No Data Message"; } -class _UnknownMessage extends _ServerMessage { +class UnknownMessage extends ServerMessage { Uint8List bytes; int code; diff --git a/lib/src/substituter.dart b/lib/src/substituter.dart index 5685b74..429bb44 100644 --- a/lib/src/substituter.dart +++ b/lib/src/substituter.dart @@ -1,7 +1,7 @@ -part of postgres; +import 'package:postgres/src/postgresql_codec.dart'; typedef String SQLReplaceIdentifierFunction( - _PostgreSQLFormatIdentifier identifier, int index); + PostgreSQLFormatIdentifier identifier, int index); class PostgreSQLFormat { static int _AtSignCodeUnit = "@".codeUnitAt(0); @@ -122,7 +122,7 @@ class PostgreSQLFormat { if (t.type == _PostgreSQLFormatTokenType.text) { return t.buffer; } else { - var identifier = new _PostgreSQLFormatIdentifier(t.buffer.toString()); + var identifier = new PostgreSQLFormatIdentifier(t.buffer.toString()); if (!values.containsKey(identifier.name)) { throw new FormatException( @@ -165,8 +165,8 @@ class _PostgreSQLFormatToken { StringBuffer buffer = new StringBuffer(); } -class _PostgreSQLFormatIdentifier { - _PostgreSQLFormatIdentifier(String t) { +class PostgreSQLFormatIdentifier { + PostgreSQLFormatIdentifier(String t) { var components = t.split(":"); if (components.length == 1) { name = components.first; diff --git a/lib/src/transaction_proxy.dart b/lib/src/transaction_proxy.dart deleted file mode 100644 index eed2aa8..0000000 --- a/lib/src/transaction_proxy.dart +++ /dev/null @@ -1,126 +0,0 @@ -part of postgres; - -typedef Future _TransactionQuerySignature( - PostgreSQLExecutionContext connection); - -class _TransactionProxy implements PostgreSQLExecutionContext { - _TransactionProxy(this.connection, this.executionBlock) { - beginQuery = new _Query("BEGIN", {}, connection, this) - ..onlyReturnAffectedRowCount = true; - - beginQuery.onComplete.future - .then(startTransaction) - .catchError(handleTransactionQueryError); - } - - _Query beginQuery; - Completer completer = new Completer(); - - Future get future => completer.future; - - _Query get pendingQuery { - if (queryQueue.length > 0) { - return queryQueue.first; - } - - return null; - } - - List<_Query> queryQueue = []; - PostgreSQLConnection connection; - _TransactionQuerySignature executionBlock; - - Future commit() async { - await execute("COMMIT"); - } - - Future>> query(String fmtString, - {Map substitutionValues: null, - bool allowReuse: true}) async { - if (connection.isClosed) { - throw new PostgreSQLException( - "Attempting to execute query, but connection is not open."); - } - - var query = new _Query>>( - fmtString, substitutionValues, connection, this); - - if (allowReuse) { - query.statementIdentifier = connection._reuseIdentifierForQuery(query); - } - - return await enqueue(query); - } - - Future execute(String fmtString, - {Map substitutionValues: null}) async { - if (connection.isClosed) { - throw new PostgreSQLException( - "Attempting to execute query, but connection is not open."); - } - - var query = new _Query(fmtString, substitutionValues, connection, this) - ..onlyReturnAffectedRowCount = true; - - return enqueue(query); - } - - void cancelTransaction({String reason: null}) { - throw new _TransactionRollbackException(reason); - } - - Future startTransaction(dynamic beginResults) async { - var result; - try { - result = await executionBlock(this); - } on _TransactionRollbackException catch (rollback) { - queryQueue = []; - await execute("ROLLBACK"); - completer.complete(new PostgreSQLRollback._(rollback.reason)); - return; - } catch (e) { - queryQueue = []; - - await execute("ROLLBACK"); - completer.completeError(e); - return; - } - - await execute("COMMIT"); - - completer.complete(result); - } - - Future handleTransactionQueryError(dynamic err) async {} - - Future enqueue(_Query query) async { - queryQueue.add(query); - connection._transitionToState(connection._connectionState.awake()); - - var result = null; - try { - result = await query.future; - - connection._cacheQuery(query); - queryQueue.remove(query); - } catch (e) { - connection._cacheQuery(query); - queryQueue.remove(query); - rethrow; - } - - return result; - } -} - -/// Represents a rollback from a transaction. -/// -/// If a transaction is cancelled using [PostgreSQLExecutionContext.cancelTransaction], the value of the [Future] -/// returned from [PostgreSQLConnection.transaction] will be an instance of this type. [reason] will be the [String] -/// value of the optional argument to [PostgreSQLExecutionContext.cancelTransaction]. -class PostgreSQLRollback { - PostgreSQLRollback._(this.reason); - - /// The reason the transaction was cancelled. - String reason; -} diff --git a/lib/src/utf8_backed_string.dart b/lib/src/utf8_backed_string.dart index b3ea969..01f771c 100644 --- a/lib/src/utf8_backed_string.dart +++ b/lib/src/utf8_backed_string.dart @@ -1,4 +1,4 @@ -part of postgres; +import 'dart:convert'; class UTF8BackedString { UTF8BackedString(this.string); diff --git a/test/connection_test.dart b/test/connection_test.dart index fe273c1..aa138f3 100644 --- a/test/connection_test.dart +++ b/test/connection_test.dart @@ -1,4 +1,5 @@ import 'package:postgres/postgres.dart'; +import 'package:postgres/src/exceptions.dart'; import 'package:test/test.dart'; import 'dart:io'; import 'dart:async'; diff --git a/test/encoding_test.dart b/test/encoding_test.dart index 4c195ed..b6f4840 100644 --- a/test/encoding_test.dart +++ b/test/encoding_test.dart @@ -1,5 +1,5 @@ import 'dart:convert'; -import 'package:postgres/postgres.dart'; +import 'package:postgres/src/postgresql_codec.dart'; import 'package:test/test.dart'; import 'dart:typed_data'; diff --git a/test/interpolation_test.dart b/test/interpolation_test.dart index 8f74e15..1e1e500 100644 --- a/test/interpolation_test.dart +++ b/test/interpolation_test.dart @@ -1,5 +1,5 @@ import 'dart:convert'; -import 'package:postgres/postgres.dart'; +import 'package:postgres/src/substituter.dart'; import 'package:test/test.dart'; void main() { diff --git a/test/query_reuse_test.dart b/test/query_reuse_test.dart index 8dc928b..3918df9 100644 --- a/test/query_reuse_test.dart +++ b/test/query_reuse_test.dart @@ -1,4 +1,7 @@ import 'package:postgres/postgres.dart'; +import 'package:postgres/src/exceptions.dart'; +import 'package:postgres/src/postgresql_codec.dart'; +import 'package:postgres/src/substituter.dart'; import 'package:test/test.dart'; import 'dart:async'; import 'dart:mirrors'; diff --git a/test/query_test.dart b/test/query_test.dart index 76ef30b..8f07744 100644 --- a/test/query_test.dart +++ b/test/query_test.dart @@ -1,4 +1,7 @@ import 'package:postgres/postgres.dart'; +import 'package:postgres/src/exceptions.dart'; +import 'package:postgres/src/postgresql_codec.dart'; +import 'package:postgres/src/substituter.dart'; import 'package:test/test.dart'; void main() { diff --git a/test/transaction_test.dart b/test/transaction_test.dart index 0f6588f..87a9ede 100644 --- a/test/transaction_test.dart +++ b/test/transaction_test.dart @@ -1,8 +1,7 @@ import 'package:postgres/postgres.dart'; +import 'package:postgres/src/exceptions.dart'; import 'package:test/test.dart'; -import 'dart:io'; import 'dart:async'; -import 'dart:mirrors'; void main() { group("Transaction behavior", () { @@ -293,7 +292,7 @@ void main() { await c.query("INSERT INTO t (id) VALUES (1)"); }); expect(true, false); - } on PostgreSQLException catch (e) {} + } on PostgreSQLException catch (_) {} var result = await conn.transaction((ctx) async { return await ctx.query("SELECT id FROM t");