Web3.py: How to Subscribe to Pending Ethereum Transactions in Python

The best way to interact with the Ethereum blockchain using Python is by using the web3.py library. This tutorial will show you how to use Ethereum subscriptions to listen for new transactions on the blockchain as they get submitted. We’ll also show you how to poll for incoming pending transactions to a specific Ethereum account of your choosing.

Subscriptions

The Web3.py library doesn’t natively support subscriptions yet. However, using the Python websockets library, we can still utilize Infura’s websockets endpoint to subscribe to transactions and events on the blockchain.

Setting Up Our Project

Get started by making sure you have the web3.py and websockets libraries installed on your machine:

pip install web3 websockets

Then, we can import the libraries needed for our project:

import asyncio
import json
from web3 import Web3
from websockets import connect

Connecting to Infura

We’re going to connect to Infura’s websockets endpoint so we can subscribe to new pending transactions, as well as Infura’s regular Ethereum endpoint so we can make JSON-RPC calls such as eth_get_transaction to get more information about a specific transaction.

We can define the following endpoints:

infura_ws_url = 'wss://goerli.infura.io/ws/v3/<YOUR_PROJECT_ID>'
infura_http_url = 'https://goerli.infura.io/v3/<YOUR_PROJECT_ID>'
web3 = Web3(Web3.HTTPProvider(infura_http_url))

Make sure to replace <YOUR_PROJECT_ID> with your actual Infura project ID, you can use the same for both endpoints.

Subscribing to Pending Transactions

Create a new async method which will connect to Infura’s websocket endpoint:

async def get_event():
    async with connect(infura_ws_url) as ws:
        await ws.send('{"jsonrpc": "2.0", "id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}')
        subscription_response = await ws.recv()
        print(subscription_response) # {"jsonrpc":"2.0","id":1,"result":"0xd67da23f62a01f58042bc73d3f1c8936"} 

Using ws.send(), we’re telling the node to start a new subscription for any new pending transactions, after which we get a confirmation back from the node with our subscription id.

Next, we can await any new messages from the Infura node and print the transaction hash of every new transaction appearing on the Ethereum chain.

while True:
    try:
        message = await asyncio.wait_for(ws.recv(), timeout=15)
        response = json.loads(message)
        txHash = response['params']['result']
        print(txHash)
        pass
    except:
        pass

Finally, at the bottom, let’s add an if __name__ == "__main__" statement, so our program runs when we execute it from a command line.

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    while True:
        loop.run_until_complete(get_event())

After running the program, you’ll see your terminal quickly filling up with new Ethereum transfers!

➜  Python python3 watch_subscribe.py
{"jsonrpc":"2.0","id":1,"result":"0xf1c8df0cb54ea89828976b86f2325930"}
0x9831d16f46bfe723514594e990cb3c66824a584fd849984f28adac8fb5523702
0x1c3837ceffdd48325e19754f7b84fda4effd32c0c141b7dafa90d741cdc2c8f9
0x4f8e5706c60be6482f810af9a5d9191447d55c7441f68f4019a124d04d2a40d4
0x0e90c6b1f286b6298f01f837ea8934229af680449a5e3761585cd79139fc6531
...

Monitoring a Specific Address for Incoming Transfers

Additionally, we can monitor incoming transactions to a specific Ethereum address. Let’s define an account we’d like to monitor first, outside of the get_event() function:

account = '<YOUR_PUBLIC_ADDRESS>'

Then, inside our function and try block, we can append the following to check whether the recipient is the address we specified, after which it will print the transaction hash, sender address, and the value sent in Ether.

tx = web3.eth.get_transaction(txHash)
if tx.to == account:
	print("Pending transaction found with the following details:")
	print({
            "hash": txHash,
            "from": tx["from"],
            "value": web3.fromWei(tx["value"], 'ether')
  })

Complete Code Overview

Special thanks to @Christian for designing the script used in this tutorial.

import asyncio
import json
from web3 import Web3
from websockets import connect

infura_ws_url = 'wss://goerli.infura.io/ws/v3/<YOUR_PROJECT_ID>'
infura_http_url = 'https://goerli.infura.io/v3/<YOUR_PROJECT_ID>'
web3 = Web3(Web3.HTTPProvider(infura_http_url))

# Used if you want to monitor ETH transactions to a specific address
account = '<YOUR_PUBLIC_ADDRESS>' 

async def get_event():
    async with connect(infura_ws_url) as ws:
        await ws.send('{"jsonrpc": "2.0", "id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}')
        subscription_response = await ws.recv()
        print(subscription_response)

        while True:
            try:
                message = await asyncio.wait_for(ws.recv(), timeout=15)
                response = json.loads(message)
                txHash = response['params']['result']
                print(txHash)
                # Uncomment lines below if you want to monitor transactions to
                # a specific address
                # tx = web3.eth.get_transaction(txHash)
                # if tx.to == account:
                #     print("Pending transaction found with the following details:")
                #     print({
                #         "hash": txHash,
                #         "from": tx["from"],
                #         "value": web3.fromWei(tx["value"], 'ether')
                #     })
                pass
            except:
                pass

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    while True:
        loop.run_until_complete(get_event())
9 Likes

Hey there, after less than 1 minute the script stop working due the error:

code = 1006 (connection closed abnormally [internal]), no reason
code = 1006 (connection closed abnormally [internal]), no reason
code = 1006 (connection closed abnormally [internal]), no reason
code = 1006 (connection closed abnormally [internal]), no reason
code = 1006 (connection closed abnormally [internal]), no reason

3 Likes

have you solve this problem?

2 Likes

hi @pubbli @douqiyuan0924, I’ve retested the script and it’s been running without interruptions for 30 minutes on my end. Are you running any modifications to the script? Which networks are you encountering the error on?

2 Likes

It fails with current libs version (2023-01-06):
websockets 9.1
web3-5.31.3
latest pip version
Python 3.10 on Windows

2 Likes

hi Alessandro, can you try upgrading the websockets package to the latest version and check again?

2 Likes

The first time I tried to update it I revert back to version 9.1 due to this error:
External Image

Now I left the new installed 10.4 version and it works.
Thanks.

4 Likes