Code A Full Node for A Proof Of Work Blockchain – From Scratch | Hacker Noon

@anikishaevAndrey Nikishaev

Machine Learning and Computer Vision Researcher. Founder LearnML.Today

In this article, I want to cover a simplified but working example of decentralized Blockchain based on Proof Of Work algorithm, some sort of simplified Bitcoin. You can think about it as a simplified version of Bitcoin.

What I will describe:
– PoW blockchain theory 
– How to write blockchain in Python from scratch
– How to write Full-Node and create decentralized nodes to run the blockchain on them

The code you can find in my GitHub: https://github.com/creotiv/full_node_blockchain

What the point in all of this?

First you should understand that Blockchain and Cryptocurrencies are not the same. Blockchain is a cryptography algorithm to store data in a decentralized way when you cant trust anyone. Cryptocurrencies are little part of all projects that used Blockchain.

Second. Idea of decentralized data storing and processing lies in the ability to create conditions where the system can be controlled only by most of the people and not by just some of them. Which blocks people, parties, countries from enforcing their rules on users of the system. Basically some sort of Free speech, but for the internet services.

What need to understand before proceeding

  1. No off chain data. As we can’t trust anyone, that implies that we cant trust any data outside our blockchain database. That means that we can’t use things like time, API calls to any services, etc. Which makes things much harder. 
    Some systems that used on-chain services called Oracles to access off-chain data, but that’s another story.
  2. Blockchain trilemma. Which come from the CAP theorem. You can pick only two. So there can’t be Blazing Fast, Decentralized, and Secure solution.

PoW Blockchain

Proof of Work is one of the oldest algorithms of consensus used in a Blockchain. Consensus — is a way how decentralized actors/nodes can agree on something that happens in the system, like data update.

So for us Blockchain is a decentralized database where we have transactions to update it state and have a consensus algorithm so nodes can agree that the update is valid. And that what is Blockchain from a global perspective.

Transactions

So assume we have many nodes, each of it has acurrent data state. So we need some way to update that state and save an order of our updates(in most cases like money transfers it matters).

We have 3 ways of doing our transactions: save new data state in the transaction, save diff of states(+$100/-$100), save state flow.

In the first one we can’t verify the chain of our modification, in case of error or attack. Even current centralized financial solutions doesn’t save just your current money balance, instead they save log of all updates to your balance, from which are calculation your current balance.

That’s why most(didn’t see them all 🙂 ) blockchain solutions use diff of state(Ethereum) or state flow pattern(Bitcoin) to describe transactions.

Here is how looks like state flow:

Each Transaction has Inputs and Outputs, where each Input pointing on the previous output. Each Output can be used as an input only 1 time.

Each input is signed by a private key of user wallet to prove that he is the owner of that Output.

So for example you have one Output from the exchange that saying that you were bought 10 coins to your wallet. And now you transferring 5 coins to your friend wallet. You left 5 coins from the previous output not spent, if you left them like this system will count them as a fee for processing your Transaction. To not pay a fee, you can add additional Output that will move money back to your wallet.

That’s the idea behind the transactions in a bitcoin-like system.

But now we need add some order to Txs, as we have a decentralized system and data can mess up.
We cant use time as other systems, because it not trusted data. That where we get Blocks, from which came the name Blockchain.

Blocks

Blocks add an order for our transactions. All Txs that go in the same block considering running at the same time.

Here is how they look like:

Each block consists of data(list of transactions), hash of the block(got from mining), and link to the previous block hash, and some additional data like time, nonce, block index.

First transaction in a block is a COINBASE transaction, that doesnt have previous hash and is used to give reward for mining.

Each block hash is constructed based on Tx hashes, previous block hash, nonce and other data. Thus you cant replace block from inside the chain. The only way how you can change the data in the whole blockchain is to recompute hashes for all blocks. And that’s where the Proof of Work algorithm takes his place.

Proof Of Work algorithm

As recomputing hashes it’s not a problem, we need some mechanism to make this unreal. PoW one of such things.
Instead of just use any hash for our block, PoW set rules on which hash we can consider valid. Such rules add a level of difficulty to calculate the hash, cause now it need many iterations to find a valid hash, thus it takes time and resources.

From that point we see that attackers can’t modify running blockchain without having at least 51% of all resources used in it. Because not forget that block mined decentralized by thousands of different nodes.

Mining

Mining is a process of finding hash for the Block that will be considered valid by the system.

If you look at the block description you will find nonce field. That field is used to change the hash for mining, as we can change block data. 
That’s working because even small modification to data, makes hash totally different.

For each mined Block miner receive some reward and also all fees from Txs.

With some amount of blocks reward and difficulty of hash rules may change. That’s how the total supply of coins is limited.

Merkel Tree

Merkel Tree — is an algorithm to hash data with a tree structure, which adds a possibility to update a resulting hash without whole tree re-computation when new data added.

It is used to hash list of transactions in a Block and save resources during mining by removing the need to recompute all Txs hashes each time.

Also, it gives the ability to find if some data in a tree with less computation needed.

System together

We have some number of nodes. Each of them has a duplicate of the whole data of the blockchain(such nodes called Full-Node).

Nodes are using the gossip communication principle — If we get data that we didn’t see we broadcast it to other nodes that we know. In such a way data like Txs, blocks, new nodes addresses, etc, are shared across all nodes.

When some time/number of Txs passed from the adding(mining) of the last block, nodes start mining for a new block concurrently, the first who mine it, add it to his chain and share to other nodes, they validate it and if it ok add it also and broadcast it farther.

In some situations Split Brain situation may occur, when two nodes mine two different blocks at the ~same time. In such case some nodes continue mining new Block based on first Block, and other on the second. The first chain which will be longer wins and will be added to the blockchain.

Problems

  1. Running mining concurrently and increasing difficulty use too much resources, that could be used more useful.
  2. As difficulty too big for one person to mine, people gathered in a mining pools, which uses their resources to mine block and spread reward among all participants.

As we see if 3 pool merges, they will have more than 51% mining resources, which will give them ability to compromise the network. Also it shows that bitcoin is not such decentralized solution as many people think.

Codding part

You can proceed straight to the code: https://github.com/creotiv/full_node_blockchain

Project structure:

I tried to not add different hard things that are not related to the blockchain directly, to minimize code amount so you are not lost in it. Also nodes don’t save their states on shut down.

What i covered in this demo:

    1. Blockchain based on Proof Of Work algorithm
    2. Transaction spent control. Each Tx Input pointed to the previous Tx Output
    3. Signed Inputs by wallet private key
    4. Using Merkel Tree for faster Block hash computation during mining
    5. Mining process
    6. Sync process between nodes
    7. Transaction and Block verifiers
    8. Same configuration on reward and difficulty for all blocks. Thus no supply limits.
    9. Nodes blocks, txs, nodes address gossip broadcast
    10. Covering Blockchain split brain situation but only for one level.
    11. Openapi schema + UI (generated by FastAPI)
    12. Some tests for blockchain. Cause it very simple to mess things up with all these hashes

What i didn’t covered in this demo:

  1. Multi-level split brain
  2. Automatic node discovery in subnets through service discovery protocol and ping
  3. Integration testing
  4. Byzantine testing
  5. Many things that real blockchain solution has. If you interesting in such, you can open Bitcoin or Ethereum after reading this.
  6. Multiprocessing for mining
  7. Light client
  8. No limitation on block sizes or number of Txs. Block mining starts after previous mining ends.

Wallet

import rsa
import binascii


class Address:
    def __init__(self, addr):
        if isinstance(addr, rsa.PublicKey):
            self.addr = addr
        else:
            if isinstance(addr,str):
                addr = addr.encode()
            # thats not clean bu i didnt find simple crypto library for 512 sha key
            # to get address/public_key short. 
            self.addr = rsa.PublicKey.load_pkcs1(b'-----BEGIN RSA PUBLIC KEY-----n%bn-----END RSA PUBLIC KEY-----n' % addr)

    def __str__(self):
        return b''.join(self.addr.save_pkcs1().split(b'n')[1:-2]).decode()

    @property
    def key(self):
        return self.addr

class Wallet:
    '''For real case wallet use ECDSA cryptography'''

    __slots__ = '_pub', '_priv'
    
    def __init__(self, pub=None, priv=None):
        if pub:
            self._pub = Address(pub)
            self._priv = rsa.PrivateKey.load_pkcs1(priv)

    @classmethod
    def create(cls):
        inst = cls(b'',b'')
        _pub, _priv = rsa.newkeys(512)
        inst._pub = Address(_pub)
        inst._priv = _priv
        return inst

    @classmethod
    def verify(cls, data, signature, address):
        signature = binascii.unhexlify(signature.encode())
        if not isinstance(address, Address):
            address = Address(address)
        try:
            return rsa.verify(data, signature, address.key) == 'SHA-256'
        except:
            return False 
    
    @property
    def address(self):
        return str(self._pub)

    @property
    def priv(self):
        return self._priv.save_pkcs1()

    def sign(self, hash):
        return binascii.hexlify(rsa.sign(hash, self._priv, 'SHA-256')).decode()

We create some wrapper around RSA python library. Wallet consists from 2 keys: public and private. The Public key is our blockchain address, and it used to verify the signature on data, which we make with our private key(which is not shared with anyone). 
In Bitcoin and other solution ECDSA cryptography are used instead of RSA

Blocks

import time
from hashlib import sha256
from merkletools import MerkleTools

from .wallet import Address


class Input:
    __slots__ = 'prev_tx_hash', 'output_index', 'signature', '_hash', 'address', 'index', 'amount'

    def __init__(self, prev_tx_hash, output_index, address, index=0):
        self.prev_tx_hash = prev_tx_hash
        self.output_index = output_index
        self.address = address
        self.index = 0
        self._hash = None
        self.signature = None
        self.amount = None

    def sign(self, wallet):
        hash_string = '{}{}{}{}'.format(
            self.prev_tx_hash, self.output_index, self.address, self.index
        ).encode()
        self.signature = wallet.sign(hash_string)

    @property
    def hash(self):
        if self._hash:
            return self._hash
        if not self.signature and self.prev_tx_hash != 'COINBASE':
            raise Exception('Sing the input first')
        hash_string = '{}{}{}{}'.format(
            self.prev_tx_hash, self.output_index, self.address, self.signature, self.index
        )
        self._hash = sha256(sha256(hash_string.encode()).hexdigest().encode('utf8')).hexdigest()
        return self._hash

    @property
    def as_dict(self):
        return {
            "prev_tx_hash":self.prev_tx_hash,
            "output_index":self.output_index,
            "address":str(self.address),
            "index":self.index,
            "hash":self.hash,
            "signature":self.signature
        }

    @classmethod
    def from_dict(cls, data):
        inst = cls(
            data['prev_tx_hash'],
            data['output_index'],
            Address(data['address']),
            data['index'],
        )
        inst.signature = data['signature']
        inst._hash = None
        return inst
        

class Output:
    __slots__ = '_hash', 'address', 'index', 'amount', 'input_hash'

    def __init__(self, address, amount, index=0):
        self.address = address
        self.index = 0
        self.amount = int(amount)
        # i use input hash here to make output hash unique, especialy for COINBASE tx
        self.input_hash = None
        self._hash = None

    @property
    def hash(self):
        if self._hash:
            return self._hash
   
        hash_string = '{}{}{}{}'.format(
            self.amount, self.index, self.address, self.input_hash
        )
        self._hash = sha256(sha256(hash_string.encode()).hexdigest().encode('utf8')).hexdigest()
        return self._hash

    @property
    def as_dict(self):
        return {
            "amount":int(self.amount),
            "address":str(self.address),
            "index":self.index,
            "input_hash": self.input_hash,
            "hash":self.hash
        }
        
    @classmethod
    def from_dict(cls, data):
        inst = cls(
            Address(data['address']),
            data['amount'],
            data['index'],
        )
        inst.input_hash = data['input_hash']
        inst._hash = None
        return inst

class Tx:
    __slots__ = 'inputs', 'outputs', 'timestamp', '_hash'

    def __init__(self, inputs, outputs, timestamp=None):   
        self.inputs = inputs
        self.outputs = outputs
        self.timestamp = timestamp or int(time.time())
        self._hash = None

    @property
    def hash(self):
        if self._hash:
            return self._hash

        # calculating input_hash for outputs
        inp_hash = sha256((str([el.as_dict for el in self.inputs]) + str(self.timestamp)).encode()).hexdigest()
        for el in self.outputs:
            el.input_hash = inp_hash

        hash_string = '{}{}{}'.format(
            [el.as_dict for el in self.inputs], [el.as_dict for el in self.outputs], self.timestamp
        )
        self._hash = sha256(sha256(hash_string.encode()).hexdigest().encode('utf8')).hexdigest()
        return self._hash

    @property
    def as_dict(self):
        inp_hash = sha256((str([el.as_dict for el in self.inputs]) + str(self.timestamp)).encode()).hexdigest()
        for el in self.outputs:
            el.input_hash = inp_hash
        return {
            "inputs":[el.as_dict for el in self.inputs],
            "outputs":[el.as_dict for el in self.outputs],
            "timestamp":self.timestamp,
            "hash":self.hash
        }

    @classmethod
    def from_dict(cls, data):
        inps = [Input.from_dict(el) for el in data['inputs']]
        outs = [Output.from_dict(el) for el in data['outputs']]
        inp_hash = sha256((str([el.as_dict for el in inps]) + str(data['timestamp'])).encode()).hexdigest()
        for el in outs:
            el.input_hash = inp_hash
            
        inst = cls(
            inps,
            outs,
            data['timestamp'],
        )
        inst._hash = None
        return inst


class Block:

    __slots__ = 'nonce', 'prev_hash', 'index', 'txs', 'timestamp', 'merkel_root'

    def __init__(self, txs, index, prev_hash, timestamp=None, nonce=0):
        self.txs = txs or []
        self.prev_hash = prev_hash
        self.index = index
        self.nonce = nonce
        self.timestamp = timestamp or int(time.time())
        self.merkel_root = None

    def build_merkel_tree(self):
        """
        Merkel Tree used to hash all the transactions, and on mining do not recompute Txs hash everytime
        Which making things much faster. 
        And tree used because we can append new Txs and rebuild root hash much faster, when just building 
        block before mine it.
        """
        if self.merkel_root:
            return self.merkel_root
        mt = MerkleTools(hash_type="SHA256")
        for el in self.txs:
            mt.add_leaf(el.hash)
        mt.make_tree()
        self.merkel_root = mt.get_merkle_root()
        return self.merkel_root

    def hash(self, nonce=None):
        if nonce:
            self.nonce = nonce
        block_string = '{}{}{}{}{}'.format(
            self.build_merkel_tree(), self.prev_hash, self.index, self.nonce, self.timestamp
        )
        return sha256(sha256(block_string.encode()).hexdigest().encode('utf8')).hexdigest()

    @property
    def as_dict(self):
        return {
            "index": self.index,
            "timestamp": self.timestamp,
            "prev_hash": self.prev_hash,
            "hash": self.hash(),
            "txs": [el.as_dict for el in self.txs],
            "nonce": self.nonce,
            "merkel_root":self.merkel_root
        }

    @classmethod
    def from_dict(cls, data):
        return cls(
            [Tx.from_dict(el) for el in data['txs']],
            data['index'],
            data['prev_hash'],
            data['timestamp'],
            data['nonce']
        )

In a blocks.py we describe our blockchain building blocks as Txs, Input, Output and Block . Each class has hash, as_dict, from_dict methods.

We sign each Input with our wallet instance.

Output class has field input_hash that used to create a unique hash for each output in a transaction, in other cases it would be similar in many cases

As i said before we use the Merkel Tree algorithm to hash all transactions in a block to speed up mining

Verifiers

import rsa
import binascii

from .wallet import Address

class TxVerifier:
    def __init__(self, db):
        self.db = db

    def verify(self, inputs, outputs):
        total_amount_in = 0
        total_amount_out = 0
        for i,inp in enumerate(inputs):
            if inp.prev_tx_hash == 'COINBASE' and i == 0:
                total_amount_in = int(self.db.config['mining_reward'])
                continue
 
            try:
                out = self.db.transaction_by_hash[inp.prev_tx_hash]['outputs'][inp.output_index]
            except KeyError:
                raise Exception('Transaction output not found.')

            total_amount_in += int(out['amount'])

            if (inp.prev_tx_hash,out['hash']) not in self.db.unspent_txs_by_user_hash.get(out['address'], set()):
                raise Exception('Output of transaction already spent.')

            hash_string = '{}{}{}{}'.format(
                inp.prev_tx_hash, inp.output_index, inp.address, inp.index
            )
            try:
                rsa.verify(hash_string.encode(), binascii.unhexlify(inp.signature.encode()), Address(out['address']).key) == 'SHA-256'
            except:
                raise Exception('Signature verification failed: %s' % inp.as_dict)

        for out in outputs:
            total_amount_out += int(out.amount)

        if total_amount_in < total_amount_out:
            raise Exception('Insuficient funds.')

        return total_amount_in - total_amount_out

class BlockOutOfChain(Exception):
    pass

class BlockVerificationFailed(Exception):
    pass

class BlockVerifier:
    def __init__(self, db):
        self.db = db
        self.tv = TxVerifier(db)

    def verify(self, head, block):
        total_block_reward = int(self.db.config['mining_reward'])

        # verifying block hash
        if int(block.hash(), 16) > (2 ** (256-self.db.config['difficulty'])):
            raise BlockVerificationFailed('Block hash bigger then target difficulty')     

        # verifying transactions in a block
        for tx in block.txs[1:]:
            fee = self.tv.verify(tx.inputs, tx.outputs)
            total_block_reward += fee
        
        total_reward_out = 0
        for out in block.txs[0].outputs:
            total_reward_out += out.amount

        # verifying block reward
        if total_block_reward != total_reward_out:
            raise BlockVerificationFailed('Wrong reward sum')
        
        # verifying some other things
        if head:
            if head.index >= block.index:
                raise BlockOutOfChain('Block index number wrong')
            if head.hash() != block.prev_hash:
                raise BlockOutOfChain('New block not pointed to the head')
            if head.timestamp > block.timestamp:
                raise BlockOutOfChain('Block from the past')

        return True

One of the main parts of our system, as we need to be sure that the data that we get from the other nodes are valid.

Previous Inputs to our Txs controlled with internal DB, that updated with each block to remove needs of passing through thewhole blockchain(27GB now) to find needed data. Basically it’s how Blockchain is saved on nodes in a network.

Blockchain

from .blocks import Block, Tx, Input, Output
from .verifiers import TxVerifier, BlockOutOfChain, BlockVerifier, BlockVerificationFailed
import logging

logger = logging.getLogger('Blockchain')


class Blockchain: 

    __slots__ =  'max_nonce', 'chain', 'unconfirmed_transactions', 'db', 'wallet', 'on_new_block', 'on_prev_block', 'current_block_transactions', 'fork_blocks'

    def __init__(self, db, wallet, on_new_block=None, on_prev_block=None):
        self.max_nonce = 2**32
    
        self.db = db
        self.wallet = wallet
        self.on_new_block = on_new_block
        self.on_prev_block = on_prev_block

        self.unconfirmed_transactions = set()
        self.current_block_transactions = set()
        self.chain = []
        self.fork_blocks = {}    
 
    def create_first_block(self):
        """
        Creating first block in a chain. Only COINBASE Tx.
        """
        tx = self.create_coinbase_tx()
        block = Block([tx], 0, 0x0)
        self.mine_block(block)

    def create_coinbase_tx(self, fee=0):
        inp = Input('COINBASE',0,self.wallet.address,0)
        inp.sign(self.wallet)
        out = Output(self.wallet.address, self.db.config['mining_reward']+fee, 0)
        return Tx([inp],[out])

    def is_valid_block(self, block):
        bv = BlockVerifier(self.db)
        return bv.verify(self.head, block)

    def add_block(self, block):
        if self.head and block.hash() == self.head.hash():
            logger.error('Duplicate block')
            return False
        try:
            self.is_valid_block(block)
        except BlockOutOfChain:
            # Here we covering split brain case only for next 2 leves of blocks
            # with high difficulty its a rare case, and more then 2 level much more rare.
            if block.prev_hash == self.head.prev_hash:
                logger.error('Split Brain detected')
                self.fork_blocks[block.hash()] = block
                return False
            else:
                for b_hash, b in self.fork_blocks.items():
                    if block.prev_hash == b_hash:
                        logger.error('Split Brain fixed. Longer chain choosen')
                        self.rollback_block()
                        self.chain.append(b)
                        self.chain.append(block)
                        self.fork_blocks = {}
                        return True
                    logger.error('Second Split Brain detected. Not programmed to fix this')
                    return False
        except BlockVerificationFailed as e:
            logger.error('Block verification failed: %s' % e)
            return False
        else:        
            self.chain.append(block)
            self.fork_blocks = {}
            logger.info('   Block added')
            return True
        logger.error('Hard chain out of sync')

    def add_tx(self, tx):
        if self.db.transaction_by_hash.get(tx.hash):
            return False
        tv = TxVerifier(self.db)
        fee = tv.verify(tx.inputs, tx.outputs)
        self.db.transaction_by_hash[tx.hash] = tx.as_dict
        self.unconfirmed_transactions.add((fee, tx.hash))
        return True
       
    def force_block(self, check_stop=None):
        '''
        Forcing to mine block. Gthering all txs with some limit. First take Txs with bigger fee.
        '''
        txs = sorted(self.unconfirmed_transactions, key=lambda x:-x[0])[:self.db.config['txs_per_block']]
        self.current_block_transactions = set(txs)
        fee = sum([v[0] for v in txs])
        txs = [Tx.from_dict(self.db.transaction_by_hash[v[1]]) for v in txs ]
        block = Block(
            txs=[self.create_coinbase_tx(fee)] + txs,
            index=self.head.index+1,
            prev_hash=self.head.hash(),
        )
        self.mine_block(block, check_stop)

    def rollover_block(self, block):
        '''
        As we use some sort of DB, we need way to update it depends we need add block or remove.
        So we have 2 methods Rollover and Rollback.
        Also i added some sort of callback in case some additional functionality should be added on top.
        For example some Blockchain analytic DB.
        '''
        self.unconfirmed_transactions -= self.current_block_transactions
        self.db.block_index = block.index
        for tx in block.txs:
            self.db.transaction_by_hash[tx.hash] = tx.as_dict
            for out in tx.outputs:
                self.db.unspent_txs_by_user_hash[str(out.address)].add((tx.hash,out.hash))
                self.db.unspent_outputs_amount[str(out.address)][out.hash] = int(out.amount)
            for inp in tx.inputs:
                if inp.prev_tx_hash == 'COINBASE':
                    continue
                prev_out = self.db.transaction_by_hash[inp.prev_tx_hash]['outputs'][inp.output_index]
                self.db.unspent_txs_by_user_hash[prev_out['address']].remove((inp.prev_tx_hash,prev_out['hash']))
                del self.db.unspent_outputs_amount[prev_out['address']][prev_out['hash']]
        if self.on_new_block:
            self.on_new_block(block, self.db)
        self.current_block_transactions = set()

    def rollback_block(self):
        block = self.chain.pop()
        self.db.block_index -= 1
        total_amount_in = 0
        total_amount_out = 0

        for tx in block.txs:
            # removing new unspent outputs
            for out in tx.outputs:
                self.db.unspent_txs_by_user_hash[str(out.address)].remove((tx.hash,out.hash))
                del self.db.unspent_outputs_amount[str(out.address)][out.hash]
                total_amount_out += out.amount
            # adding back previous unspent outputs
            for inp in tx.inputs:
                if inp.prev_tx_hash == 'COINBASE':
                    continue
                prev_out = self.db.transaction_by_hash[inp.prev_tx_hash]['outputs'][inp.output_index]
                self.db.unspent_txs_by_user_hash[prev_out['address']].add((inp.prev_tx_hash,prev_out['hash']))
                self.db.unspent_outputs_amount[prev_out['address']][prev_out['hash']] = prev_out['amount']      
                total_amount_in += int(prev_out['amount'])

            # adding Tx back un unprocessed stack
            fee = total_amount_in - total_amount_out
            self.unconfirmed_transactions.add((fee,tx.hash))

        
        if self.on_prev_block:
            self.on_prev_block(block, self.db)

    def mine_block(self, block, check_stop=None):
        '''
        Mine a block with ability to stop in case if check callback return True
        '''
        for n in range(self.max_nonce):
            if check_stop and check_stop():
                logger.error('Mining interrupted.')
                return
            if int(block.hash(nonce=n), 16) <= (2 ** (256-self.db.config['difficulty'])):
                self.add_block(block)
                self.rollover_block(block)
                logger.info('  Block mined at nonce: %s' % n)
                break



    @property
    def head(self):
        if not self.chain:
            return None
        return self.chain[-1]

    @property
    def blockchain(self):
        return [el.as_dict for el in reversed(self.chain)]

Method force_block used to run mining of the new block by gathering some number of Txs ordered by fee and add Coinbase Tx to them.

After block added the chain we use rollover_block to update our DB with new data.

In case when new block(that we got from a different node) create Split Brain issue we use rollback_block to rollback the chain to the previous block and merge the new longest chain(code do not support multi level split brain, as it almost impossible in the real world)

There also some tests to verify that blockchain code works normaly.

Now we need to make this tun concurrently

Creating full-node with FastApi

I used FastApi as it fast, simple, use asyncio and can build OpenApi schema and debug UI from code(which is awesome).

from fastapi import FastAPI, BackgroundTasks, Request
import uvicorn
import requests
import asyncio
import logging
import sys

from models import *
from blockchain.db import DB
from blockchain.blockchain import Blockchain
from blockchain.wallet import Wallet
from blockchain.api import API
from blockchain.blocks import Input, Output, Tx

# Custom formatter
class ColorFormatter(logging.Formatter):

    def __init__(self, fmt="%(asctime)s - Blockchain - %(message)s"):
        super(ColorFormatter,self).__init__(fmt)
        red = '33[0;31m'
        nc = '33[0m'
        cyan = '33[0;96m'

        err_fmt  = f"{red}%(asctime)s - Blockchain{nc} - %(message)s"
        info_fmt = f"{cyan}%(asctime)s - Blockchain{nc} - %(message)s"
        self.err = logging.Formatter(err_fmt)
        self.log = logging.Formatter(info_fmt)

    def format(self, record):
        if record.levelno == logging.ERROR:
            return self.err.format(record)
        else:
            return self.log.format(record)


logger = logging.getLogger("Blockchain")


app = FastAPI()
app.config = {}
app.jobs = {}

### TASKS
def sync_data():
    logger.info('================== Sync started =================')
    bc = app.config['api']
    head = bc.get_head()
    while True:
        sync_running = False
        for node in app.config['nodes']:
            if node == ('%s:%s' % (app.config['host'],app.config['port'])):
                continue
            url = 'http://%s/chain/sync' % node
            start = head['index']+1 if head else 0
            while True:
                logger.info(url, {"from_block":start, "limit":20})
                res = requests.get(url, params={"from_block":start, "limit":20})
                if res.status_code == 200:
                    data = res.json()
                    if not data:
                        break
                    sync_running = True
                    for block in data:
                        try:
                            bc.add_block(block)
                        except Exception as e:
                            logger.exception(e)
                            return
                        else:
                            logger.info(f"Block added: #{block['index']}")
                    start += 20

            head = bc.get_head()
        if not sync_running:
            app.config['sync_running'] = False
            logger.info('================== Sync stopped =================')
            return
            
def broadcast(path, data, params=False, fiter_host=None):
    for node in list(app.config['nodes'])[:]:
        if node == ('%s:%s' % (app.config['host'],app.config['port'])) or fiter_host == node:
            continue
        url = 'http://%s%s' % (node,path)
        logger.info(f'Sending broadcast {url} except: {fiter_host}')
        try:
            # header added here as we run all nodes on one domain and need somehow understand the sender node
            # to not create broadcast loop
            if params:
                requests.post(url, params=data, timeout=2, headers={'node':'%s:%s' % (app.config['host'],app.config['port'])})   
            else:
                requests.post(url, json=data, timeout=2, headers={'node':'%s:%s' % (app.config['host'],app.config['port'])})
        except:
            pass

def mine(event):
    logger.info('>>>>>>>>>> Starting mining loop')
    # In real case you chould do like this, mining script should run in separate process
    while True:
        try:
            def check_stop():
                return event.is_set()
            logger.info(f'>> Starting new block mining')
            app.config['api'].mine_block(check_stop)
            logger.info(f'>> New block mined')
            broadcast('/chain/add_block', app.config['api'].get_head())
            if event.is_set():
                return
        except asyncio.CancelledError:
            logger.info('>>>>>>>>>> Mining loop stopped')
            return
        except Exception as e:
            logger.exception(e)


### SERVER OPERATIONS

@app.post("/chain/stop-mining")
async def stop_mining():
    if app.jobs.get('mining'):
        app.jobs['mining'].set()
        app.jobs['mining'] = None

@app.post("/chain/start-mining")
async def start_minig():
    if not app.jobs.get('mining'):
        loop = asyncio.get_running_loop()
        app.jobs['mining'] = asyncio.Event()
        loop.run_in_executor(None, mine, app.jobs['mining'])

@app.get("/server/nodes")
async def get_nodes():
    return app.config['nodes']

@app.post("/server/add_nodes")
async def add_nodes(nodes:NodesModel, request: Request):
    length = len(app.config['nodes'])
    app.config['nodes'] |= set(nodes.nodes)
    if length < len(app.config['nodes']):
        broadcast('/server/add_nodes', {'nodes':list(app.config['nodes'])}, False, request.headers.get('node'))
        logger.info(f'New nodes added: {nodes.nodes}')
    return {"success":True}

### DEMO OPERATIONS

@app.get("/demo/send_amount")
async def send_amount(address_to:str, amount:int, background_tasks: BackgroundTasks):
    '''Sending amount of coins from server wallet to some other wallet'''

    address_from = app.config['wallet'].address
    wallet = app.config['wallet']
    bc = app.config['api']
    unspent_txs = bc.get_user_unspent_txs(address_from)
    total = 0
    inputs = []
    i = 0
    try:
        while total < amount:
            prev = unspent_txs[i]
            inp = Input(prev['tx'],prev['output_index'],address_from,i)
            inp.sign(wallet)
            total += prev['amount']
            i += 1
            inputs.append(inp)
    except Exception as e:
        return {"success":False, "msg":str(e)}

    outs = [Output(address_to, amount, 0)]
    if total - amount > 0:
        outs.append(Output(address_from, total - amount, 1))

    tx = Tx(inputs,outs)
    try:
        res = bc.add_tx(tx.as_dict)
    except Exception as e:
        logger.exception(e)
        return {"success":False, "msg":str(e)}
    else:
        if res:
            logger.info(f'Tx added to the stack')
            background_tasks.add_task(broadcast, '/chain/tx_create', tx.as_dict, False)
            return {"success":True}
        logger.info('Tx already in stack. Skipped.')
        return {"success":False, "msg":"Duplicate"}
    

### ON CHAIN OPERATIONS

@app.get("/chain/get_amount")
async def get_wallet(address):
    bc = app.config['api']
    return {"address": address, "amount":bc.get_user_balance(address)}

@app.get("/chain/get_unspent_tx")
async def get_unspent_tx(address):
    bc = app.config['api']
    return {"address": address, "tx":bc.get_user_unspent_txs(address)}


@app.get("/chain/status")
async def status():
    bc = app.config['api']
    head = bc.get_head()
    if not head:
        return {'empty_node':True}
    return {
        'block_index':head['index'],
        'block_prev_hash':head['prev_hash'],
        'block_hash':head['hash'],
        'timestamp':head['timestamp']
    }

@app.get("/chain/sync")
async def sync(from_block:int, limit:int=20):
    bc = app.config['api']
    return bc.get_chain(from_block, limit)

@app.post("/chain/add_block")
async def add_block(block:BlockModel, background_tasks: BackgroundTasks, request: Request):
    logger.info(f"New block arived: #{block.index} from {request.headers.get('node')}")
    if app.config['sync_running']:
        logger.error(f'################### Not added, cause sync is running')
        return {"success":False, "msg":'Out of sync'}
    bc = app.config['api']
    head = bc.get_head()

    if (head['index'] + 1) < block.index:
        app.config['sync_running'] = True
        background_tasks.add_task(sync_data)
        logger.error(f'################### Not added, cause node out of sync.')
        return {"success":False, "msg":'Out of sync'}
    try:
        res = bc.add_block(block.dict())
        if res: restart_miner()
    except Exception as e:
        logger.exception(e)
        return {"success":False, "msg":str(e)}
    else:
        if res:
            logger.info('Block added to the chain')
            background_tasks.add_task(broadcast, '/chain/add_block', block.dict(), False, request.headers.get('node'))
            return {"success":True}
        logger.info('Old block. Skipped.')
        return {"success":False, "msg":"Duplicate"}

@app.post("/chain/tx_create")
async def add_tx(tx: TxModel, background_tasks: BackgroundTasks, request: Request):
    logger.info(f'New Tx arived')
    bc = app.config['api']
    try:
        res = bc.add_tx(tx.dict())
    except Exception as e:
        logger.exception(e)
        return {"success":False, "msg":str(e)}
    else:
        if res:
            logger.info(f'Tx added to the stack')
            background_tasks.add_task(broadcast, '/chain/tx_create', tx.dict(), False, request.headers.get('node'))
            return {"success":True}
        logger.info('Tx already in stack. Skipped.')
        return {"success":False, "msg":"Duplicate"}

@app.on_event("startup")
async def on_startup():
    app.config['sync_running'] = True
    loop = asyncio.get_running_loop()
    # sync data before run the node
    await loop.run_in_executor(None, sync_data)
    # add our node address to connected node to broadcast around network
    loop.run_in_executor(None, broadcast, '/server/add_nodes', {'nodes':['%s:%s' % (app.config['host'],app.config['port'])]}, False)
    if app.config['mine']:
        app.jobs['mining'] = asyncio.Event()
        loop.run_in_executor(None, mine, app.jobs['mining'])
    
@app.on_event("shutdown")
async def on_shutdown():
    if app.jobs.get('mining'):
        app.jobs.get('mining').set()

#### Utils ###########################
def restart_miner():
    if app.jobs.get('mining'):
        loop = asyncio.get_running_loop()
        app.jobs['mining'].set()
        app.jobs['mining'] = asyncio.Event()
        loop.run_in_executor(None, mine, app.jobs['mining'])

if __name__ == "__main__":

    logger.setLevel(logging.INFO)
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(ColorFormatter())
    handler.setLevel(logging.INFO)
    logger.addHandler(handler)

    import argparse
    parser = argparse.ArgumentParser(description='Blockchain full node.')
    parser.add_argument('--node', type=str, help='Address of node to connect. If not will init fist node.')
    parser.add_argument('--port', required=True, type=int, help='Port on which run the node.')
    parser.add_argument('--mine', required=False, type=bool, help='Port on which run the node.')
    parser.add_argument('--diff', required=False, type=int, help='Difficulty')

    args = parser.parse_args()
    _DB = DB()
    _DB.config['difficulty']
    _W = Wallet.create()
    _BC = Blockchain(_DB, _W)
    _API = API(_BC)
    logger.info(' ####### Server address: %s ########' %_W.address)

    app.config['db'] = _DB
    app.config['wallet'] = _W
    app.config['bc'] = _BC
    app.config['api'] = _API
    app.config['port'] = args.port  
    app.config['host'] = '127.0.0.1'
    app.config['nodes'] = set([args.node]) if args.node else set(['127.0.0.1:%s' % args.port])
    app.config['sync_running'] = False
    app.config['mine'] = args.mine

    if not args.node:
        _BC.create_first_block()

    uvicorn.run(app, host="127.0.0.1", port=args.port, access_log=True)

To wrap some additional functionality around blockchain code i added some API layer between node and blockchain.

Here we have 3 async tasks: mine, broadcast and sync_data.

We should run mining as a separate process, but this will add more code, so right now it just running in the same thread, which is ok for the test. Broadcast is used to spread data across known nodes. And Data Sync getting blockchain from the node on start or if get outperforming block.

Mining are running without any stop, block after block, if we dont add any Txs then it will have only coinbase transaction.
If we get a new block before mining of the block ends we stop mining and proceed with mining from the new block.

All duplicates Tx, Blocks removed and not broadcasted to the network.

Code repo: https://github.com/creotiv/full_node_blockchain

Tags

Join Hacker Noon

Create your free account to unlock your custom reading experience.

read original article here