@@ -61,12 +61,12 @@ typedef struct
6161
6262typedef struct
6363{
64- DtmCOmmitMessage buf [TX_BUFFER_SIZE ];
64+ DtmCommitMessage data [TX_BUFFER_SIZE ];
6565 int used ;
6666} DtmTxBuffer ;
6767
6868static int * sockets ;
69- static DtmCommitMessage * * txBuffers ;
69+ static DtmTxBuffer * txBuffers ;
7070
7171static BackgroundWorker DtmSender = {
7272 "mm-sender" ,
@@ -216,18 +216,40 @@ static void acceptConnections()
216216
217217static void DtmTransSender (Datum arg )
218218{
219- txBuffer = (DtmCommitMessage * )
219+ int nNodes = dtm -> nNodes ;
220+ int i ;
221+ DtmCommitMessage * txBuffer = (DtmCommitMessage * )palloc (sizeof (DtmTxBuffer )* (nNodes ));
222+
220223 openConnections ();
221224
225+ for (i = 0 ; i < nNodes ; i ++ ) {
226+ txBuffer [i ].used = 0 ;
227+ }
228+
222229 while (true) {
223- DtmTransState * ts ;
230+ DtmTransState * ts ;
224231 PGSemaphoreLock (& dtm -> semphore );
225232
226- LWLockAcquire (& dtm -> hashLock , LW_EXCLUSIVE );
233+ SpinLockAcquire (& dtm -> spinlock );
234+ ts = dtm -> pendingTransactions ;
235+ dtm -> pendingTransactions = NULL ;
236+ SpinLockRelease (& dtm -> spinlock );
237+
227238 for (ts = dtm -> pendingTransactions ; ts != NULL ; ts = ts -> nextPending ) {
228239 int node = ts -> gtid .node ;
229240 Assert (node != MMNodeId );
230- sockets
241+ node -= 1 ;
242+ if (txBuffer [node ].used == TX_BUFFER_SIZE ) {
243+ WriteSocket (sockets [node ], txBuffer [node ].data , txBuffer [node ].used * sizeof (DtmCommitRequest ));
244+ txBuffer [node ].used = 0 ;
245+ }
246+ txBuffer [node ].data [txBuffer [node ].used ].xid = ts -> xid ;
247+ txBuffer [node ].data [txBuffer [node ].used ].csn = ts -> csn ;
248+ txBuffer [node ].used += 1 ;
249+ }
250+ dtm -> pendingTransactions = NULL ;
251+
252+ }
231253}
232254
233255static void DtmTransReceiver (Datum arg )
0 commit comments