Source code for powerpool.stratum_server

import json
import socket
import datetime
import argparse
import struct
import random
import time
import weakref

from collections import deque
from binascii import hexlify, unhexlify
from cryptokit import target_from_diff, uint256_from_str
from gevent import sleep, with_timeout
from gevent.queue import Queue
from gevent.pool import Pool
from gevent.server import StreamServer
from pprint import pformat

from .agent_server import AgentServer, AgentClient
from .exceptions import LoopExit
from .server import GenericClient
from .utils import time_format
from .exceptions import ConfigurationError
from .lib import Component, loop, REQUIRED


class ArgumentParserError(Exception):
    pass


class ThrowingArgumentParser(argparse.ArgumentParser):
    def error(self, message):
        raise ArgumentParserError(message)


password_arg_parser = ThrowingArgumentParser()
password_arg_parser.add_argument('-d', '--diff', type=float)


[docs]class StratumServer(Component, StreamServer): """ A single port binding of our stratum server. """ one_min_stats = ['stratum_connects', 'stratum_disconnects', 'agent_connects', 'agent_disconnects', 'reject_low_share_n1', 'reject_dup_share_n1', 'reject_stale_share_n1', 'acc_share_n1', 'reject_low_share_count', 'reject_dup_share_count', 'reject_stale_share_count', 'acc_share_count', 'unk_err', 'not_authed_err', 'not_subbed_err'] # enhance readability by reducing magic number use... defaults = dict(address="0.0.0.0", port=3333, start_difficulty=128, server_seed=0, reporter=None, jobmanager=None, algo=REQUIRED, idle_worker_threshold=300, aliases={}, valid_address_versions=[], donate_key="donate", vardiff=dict(enabled=False, spm_target=20, interval=30, tiers=[8, 16, 32, 64, 96, 128, 192, 256, 512]), minimum_manual_diff=64, push_job_interval=30, idle_worker_disconnect_threshold=3600, agent=dict(enabled=False, port_diff=1111, timeout=120, accepted_types=['temp', 'status', 'hashrate', 'thresholds'])) # Don't spawn a greenlet to handle creation of clients, we start one for # reading and one for writing in their own class... _spawn = None def __init__(self, config): self._configure(config) self.agent_servers = [] # Start a corresponding agent server if self.config['agent']['enabled']: serv = AgentServer(self) self.agent_servers.append(serv) # A dictionary of all connected clients indexed by id self.clients = {} self.agent_clients = {} # A dictionary of lists of connected clients indexed by address self.address_lut = {} # A dictionary of lists of connected clients indexed by address and # worker tuple self.address_worker_lut = {} # counters that allow quick display of these numbers. stratum only self.authed_clients = 0 self.idle_clients = 0 # Unique client ID counters for stratum and agents. We want them to # unique across all clients on all servers, so we seed with some unique # starting number self.stratum_id_count = (self.config['port'] * 10000 + self.config['server_seed']) self.agent_id_count = 0 # Track the last job we pushed and when we pushed it self.last_flush_job = None self.last_flush_time = None self.last_job = None self.last_time = None self.listener = None # Keep track of the jobs we've recieved, both recently flushed and # active self.active_jobs = set() # Nothing ever actually needs to be looked up in this object, it merely # has to hold reference to the job objects so they don't get garbage # collected self.stale_jobs = deque([], maxlen=10)
[docs] def start(self, *args, **kwargs): self.listener = (self.config['address'], self.config['port'] + self.manager.config['server_number']) StreamServer.__init__(self, self.listener, spawn=Pool()) self.algo = self.manager.algos[self.config['algo']] if not self.config['reporter'] and len(self.manager.component_types['Reporter']) == 1: self.reporter = self.manager.component_types['Reporter'][0] elif not self.config['reporter']: raise ConfigurationError( "There are more than one Reporter components, target reporter" "must be specified explicitly!") else: self.reporter = self._lookup(self.config['reporter']) if not self.config['jobmanager'] and len(self.manager.component_types['Jobmanager']) == 1: self.jobmanager = self.manager.component_types['Jobmanager'][0] elif not self.config['jobmanager']: raise ConfigurationError( "There are more than one Jobmanager components, target jobmanager " "must be specified explicitly!") else: self.jobmanager = self._lookup(self.config['jobmanager']) self.jobmanager.new_job.rawlink(self.new_job) self.logger.info("Stratum server starting up on {}".format(self.listener)) for serv in self.agent_servers: serv.start() StreamServer.start(self, *args, **kwargs) Component.start(self)
[docs] def stop(self, *args, **kwargs): self.logger.info("Stratum server {} stopping".format(self.listener)) StreamServer.close(self) for serv in self.agent_servers: serv.stop() for client in self.clients.values(): client.stop() StreamServer.stop(self) Component.stop(self) self.logger.info("Exit")
[docs] def handle(self, sock, address): """ A new connection appears on the server, so setup a new StratumClient object to manage it. """ self.logger.info("Recieving stratum connection from addr {} on sock {}" .format(address, sock)) self.stratum_id_count += 1 client = StratumClient( sock, address, config=self.config, logger=self.logger, manager=self.manager, algo=self.algo, server=self, reporter=self.reporter) client.start()
[docs] def new_job(self, event): """ Gets called whenever there's a new job generated by our jobmanager. """ job = event.job t = time.time() job.stratum_string() flush = job.type == 0 if flush: # Push the old jobs onto the stale list and generate a new # blank jobs dict self.stale_jobs.append(self.active_jobs) self.active_jobs = set() self.last_job = job self.last_time = t self.active_jobs.add(job) if job.type in [0, 1]: for client in self.clients.itervalues(): if client.authenticated: client._push(job, flush=flush, block=False) self.logger.info( "New job enqueued for transmission to {} users in {}" .format(len(self.clients), time_format(time.time() - t))) if flush: self.last_flush_job = job self.last_flush_time = time.time()
@property def status(self): """ For display in the http monitor """ hps = (self.algo['hashes_per_share'] * self.counters['acc_share_n1'].minute / 60.0) dct = dict(mhps=hps / 1000000.0, hps=hps, last_flush_job=None, agent_client_count=len(self.agent_clients), client_count=len(self.clients), address_count=len(self.address_lut), address_worker_count=len(self.address_lut), active_jobs=len(self.active_jobs), stale_jobs=sum([len(j) for j in self.stale_jobs]), client_count_authed=self.authed_clients, client_count_active=len(self.clients) - self.idle_clients, client_count_idle=self.idle_clients) if self.last_flush_job: j = self.last_flush_job dct['last_flush_job'] = dict( algo=j.algo, pow_block_hash=j.pow_block_hash, currency=j.currency, job_id=j.job_id, merged_networks=j.merged_data.keys(), pushed_at=self.last_flush_time ) return dct
[docs] def set_user(self, client): """ Add the client (or create) appropriate worker and address trackers """ user_worker = (client.address, client.worker) self.address_worker_lut.setdefault(user_worker, []) self.address_worker_lut[user_worker].append(client) self.authed_clients += 1 self.address_lut.setdefault(user_worker[0], []) self.address_lut[user_worker[0]].append(client)
[docs] def add_client(self, client): if isinstance(client, StratumClient): self._incr('stratum_connects') self.clients[client._id] = client elif isinstance(client, AgentClient): self._incr('agent_connects') self.agent_clients[client._id] = client else: self.logger.warn("Add client got unknown client of type {}" .format(type(client)))
[docs] def remove_client(self, client): """ Manages removing the StratumClient from the luts """ if isinstance(client, StratumClient): del self.clients[client._id] address, worker = client.address, client.worker self._incr('stratum_disconnects') if client.authenticated: self.authed_clients -= 1 if client.idle: self.idle_clients -= 1 # it won't appear in the luts if these values were never set if address is None and worker is None: return # wipe the client from the address tracker if address in self.address_lut: # remove from lut for address self.address_lut[address].remove(client) # if it's the last client in the object, delete the entry if not len(self.address_lut[address]): del self.address_lut[address] # wipe the client from the address/worker lut key = (address, worker) if key in self.address_worker_lut: self.address_worker_lut[key].remove(client) # if it's the last client in the object, delete the entry if not len(self.address_worker_lut[key]): del self.address_worker_lut[key] elif isinstance(client, AgentClient): self._incr('agent_disconnects') del self.agent_clients[client._id] else: self.logger.warn("Remove client got unknown client of type {}" .format(type(client)))
[docs]class StratumClient(GenericClient): """ Object representation of a single stratum connection to the server. """ # Stratum error codes errors = {20: 'Other/Unknown', 21: 'Job not found (=stale)', 22: 'Duplicate share', 23: 'Low difficulty share', 24: 'Unauthorized worker', 25: 'Not subscribed'} error_counter = {20: 'unk_err', 24: 'not_authed_err', 25: 'not_subbed_err'} # enhance readability by reducing magic number use... STALE_SHARE_ERR = 21 LOW_DIFF_ERR = 23 DUP_SHARE_ERR = 22 # constansts for share submission outcomes. returned by the share checker VALID_SHARE = 0 DUP_SHARE = 1 LOW_DIFF_SHARE = 2 STALE_SHARE = 3 share_type_strings = {0: "acc", 1: "dup", 2: "low", 3: "stale"} def __init__(self, sock, address, logger, manager, server, reporter, algo, config): self.config = config self.manager = manager self.algo = algo self.server = server self.reporter = reporter self.logger = logger self.sock = sock self.address = address # Seconds before sending keepalive probes sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 120) # Interval in seconds between keepalive probes sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 1) # Failed keepalive probles before declaring other end dead sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 5) self.authenticated = False self.subscribed = False # flags for current connection state self.idle = False self.address = None self.worker = None self.client_type = None # the worker id. this is also extranonce 1 id = self.server.stratum_id_count if self.manager.config['extranonce_serv_size'] == 8: self._id = hexlify(struct.pack('Q', id)) elif self.manager.config['extranonce_serv_size'] == 4: self._id = hexlify(struct.pack('I', id)) else: raise Exception("Unsupported extranonce size!") t = time.time() # running total for vardiff self.accepted_shares = 0 # an index of jobs and their difficulty self.job_mapper = {} self.old_job_mapper = {} self.job_counter = random.randint(0, 100000) # Allows us to avoid a bunch of clients getting scheduled at the same # time by offsetting most timing values by this self.time_seed = random.uniform(0, 10) # Used to determine if they're idle self.last_share_submit = t # Used to determine if we should send another job on read loop timeout self.last_job_push = t # Avoids repeat pushing jobs that the client already knows about self.last_job = None # Last time vardiff happened self.last_diff_adj = t - self.time_seed # Current difficulty setting self.difficulty = self.config['start_difficulty'] # the next diff to be used by push job self.next_diff = self.config['start_difficulty'] # What time the user connected... self.connection_time = int(t) # where we put all the messages that need to go out self.write_queue = Queue() self.fp = None self._stopped = False
[docs] def _incr(self, *args): self.server._incr(*args)
[docs] def send_error(self, num=20, id_val=1): """ Utility for transmitting an error to the client """ err = {'id': id_val, 'result': None, 'error': (num, self.errors[num], None)} if __debug__: self.logger.debug("Error number {}".format(num, self.peer_name[0])) self.write_queue.put(json.dumps(err, separators=(',', ':')) + "\n")
[docs] def send_success(self, id_val=1): """ Utility for transmitting success to the client """ succ = {'id': id_val, 'result': True, 'error': None} if __debug__: self.logger.debug("success response: {}".format(pformat(succ))) self.write_queue.put(json.dumps(succ, separators=(',', ':')) + "\n")
[docs] def push_difficulty(self): """ Pushes the current difficulty to the client. Currently this only happens uppon initial connect, but would be used for vardiff """ send = {'params': [self.difficulty], 'id': None, 'method': 'mining.set_difficulty'} self.write_queue.put(json.dumps(send, separators=(',', ':')) + "\n")
[docs] def push_job(self, flush=False, timeout=False): """ Pushes the latest job down to the client. Flush is whether or not he should dump his previous jobs or not. Dump will occur when a new block is found since work on the old block is invalid.""" job = None while job is None: job = self.server.last_job if job is None: self.logger.warn("No jobs available for worker!") sleep(0.1) if self.last_job == job and not timeout: self.logger.info("Ignoring non timeout resend of job id {} to worker {}.{}" .format(job.job_id, self.address, self.worker)) return # we push the next difficulty here instead of in the vardiff block to # prevent a potential mismatch between client and server if self.next_diff != self.difficulty: self.logger.info( "Pushing diff update {} -> {} before job for {}.{}" .format(self.difficulty, self.next_diff, self.address, self.worker)) self.difficulty = self.next_diff self.push_difficulty() if __debug__: self.logger.debug("Sending job id {} to worker {}.{}{}" .format(job.job_id, self.address, self.worker, " after timeout" if timeout else '')) self._push(job)
[docs] def _push(self, job, flush=False, block=True): """ Abbreviated push update that will occur when pushing new block notifications. Mico-optimized to try and cut stale share rates as much as possible. """ self.last_job = job self.last_job_push = time.time() # get client local job id to map current difficulty self.job_counter += 1 if self.job_counter % 100 == 0: # Run a swap to avoid GC tmp = self.job_mapper self.old_job_mapper = self.job_mapper self.job_mapper = tmp self.job_mapper.clear() job_id = str(self.job_counter) self.job_mapper[job_id] = (self.difficulty, weakref.ref(job)) self.write_queue.put(job.stratum_string() % (job_id, "true" if flush else "false"), block=block)
[docs] def submit_job(self, data, t): """ Handles recieving work submission and checking that it is valid , if it meets network diff, etc. Sends reply to stratum client. """ params = data['params'] # [worker_name, job_id, extranonce2, ntime, nonce] # ["slush.miner1", "bf", "00000001", "504e86ed", "b2957c02"] if __debug__: self.logger.debug( "Recieved work submit:\n\tworker_name: {0}\n\t" "job_id: {1}\n\textranonce2: {2}\n\t" "ntime: {3}\n\tnonce: {4} ({int_nonce})" .format( *params, int_nonce=struct.unpack(str("<L"), unhexlify(params[4])))) if self.idle: self.idle = False self.server.idle_clients -= 1 self.last_share_submit = time.time() try: difficulty, job = self.job_mapper[data['params'][1]] job = job() # weakref will be None if the job has been GCed except KeyError: try: difficulty, job = self.old_job_mapper[data['params'][1]] job = job() # weakref will be None if the job has been GCed except KeyError: job = None # Job not in jobmapper at all, we got a bogus submit # since we can't identify the diff we just have to assume it's # current diff difficulty = self.difficulty if job not in self.server.active_jobs: self.send_error(self.STALE_SHARE_ERR, id_val=data['id']) self.reporter.log_share(client=self, diff=self.difficulty, typ=self.STALE_SHARE, params=params, job=job, start=t) return difficulty, self.STALE_SHARE # assemble a complete block header bytestring header = job.block_header( nonce=params[4], extra1=self._id, extra2=params[2], ntime=params[3]) # Check a submitted share against previous shares to eliminate # duplicates share_lower = (self._id.lower(), params[2].lower(), params[4].lower(), params[3].lower()) if share_lower in job.acc_shares: self.logger.info("Duplicate share rejected from worker {}.{}!" .format(self.address, self.worker)) self.send_error(self.DUP_SHARE_ERR, id_val=data['id']) self.reporter.log_share(client=self, diff=difficulty, typ=self.DUP_SHARE, params=params, job=job, start=t) return difficulty, self.DUP_SHARE job_target = target_from_diff(difficulty, job.diff1) hash_int = uint256_from_str(self.algo['module'](header)) if hash_int >= job_target: self.logger.info("Low diff share rejected from worker {}.{}!" .format(self.address, self.worker)) self.send_error(self.LOW_DIFF_ERR, id_val=data['id']) self.reporter.log_share(client=self, diff=difficulty, typ=self.LOW_DIFF_SHARE, params=params, job=job, start=t) return difficulty, self.LOW_DIFF_SHARE # we want to send an ack ASAP, so do it here self.send_success(id_val=data['id']) # Add the share to the accepted set to check for dups job.acc_shares.add(share_lower) self.accepted_shares += difficulty self.reporter.log_share(client=self, diff=difficulty, typ=self.VALID_SHARE, params=params, job=job, header_hash=hash_int, header=header, start=t) return difficulty, self.VALID_SHARE
[docs] def recalc_vardiff(self): # ideal difficulty is the n1 shares they solved divided by target # shares per minute spm_tar = self.config['vardiff']['spm_target'] ideal_diff = self.reporter.spm(self.address) / spm_tar if __debug__: self.logger.debug("VARDIFF: Calculated client {} ideal diff {}" .format(self._id, ideal_diff)) # find the closest tier for them new_diff = min(self.config['vardiff']['tiers'], key=lambda x: abs(x - ideal_diff)) self.last_diff_adj = time.time() if new_diff != self.difficulty: self.logger.info( "VARDIFF: Moving to D{} from D{} on {}.{}" .format(new_diff, self.difficulty, self.address, self.worker)) self.next_diff = new_diff self.push_job(timeout=True) return True elif __debug__: self.logger.debug("VARDIFF: Not adjusting difficulty, already " "close enough") return False
@loop(fin='stop', exit_exceptions=(socket.error, ))
[docs] def read(self): # designed to time out approximately "push_job_interval" after the user # last recieved a job. Some miners will consider the mining server dead # if they don't recieve something at least once a minute, regardless of # whether a new job is _needed_. This aims to send a job _only_ as # often as needed line = with_timeout(time.time() - self.last_job_push + self.config['push_job_interval'] - self.time_seed, self.fp.readline, timeout_value='timeout') if line == 'timeout': t = time.time() # Set idle status if they haven't submitted in the perscribed time if not self.idle and (t - self.last_share_submit) > self.config['idle_worker_threshold']: self.idle = True self.server.idle_clients += 1 # Disconnect if we havne't heard from them in a while if (t - self.last_share_submit) > self.config['idle_worker_disconnect_threshold']: self.logger.info("Disconnecting worker {}.{} at ip {} for inactivity" .format(self.address, self.worker, self.peer_name[0])) self.stop() # push a new job if if (self.authenticated is True and # don't send to non-authed # force send if we need to push a new difficulty (self.next_diff != self.difficulty or # send if we're past the push interval t > (self.last_job_push + self.config['push_job_interval'] - self.time_seed))): # Since they might not be submitting jobs due to low hashrate, # check vardiff on timeout in addition to on mining submit if self.config['vardiff']['enabled'] is True: # If recalc didn't need to adjust then we need to push # because we're timed out if not self.recalc_vardiff(): self.push_job(timeout=True) else: self.push_job(timeout=True) # Continue loop, we just sent new job after timeout return line = line.strip() # Reading from a defunct connection yeilds an EOF character which gets # stripped off if not line: raise LoopExit("Closed file descriptor encountered") try: data = json.loads(line) except ValueError: self.logger.warn("Data {}.. not JSON".format(line[:15])) self.send_error() self._incr('unk_err') return # handle malformed data data.setdefault('id', 1) data.setdefault('params', []) if __debug__: self.logger.debug("Data {} recieved on client {}".format(data, self._id)) # run a different function depending on the action requested from # user if 'method' not in data: self.logger.warn("Empty action in JSON {}".format(self.peer_name[0])) self._incr('unk_err') self.send_error(id_val=data['id']) return meth = data['method'].lower() if meth == 'mining.subscribe': if self.subscribed is True: self.send_error(id_val=data['id']) return try: self.client_type = data['params'][0] except IndexError: pass ret = { 'result': ( ( # These values aren't used for anything, although # perhaps they should be ("mining.set_difficulty", self._id), ("mining.notify", self._id) ), self._id, self.manager.config['extranonce_size'] ), 'error': None, 'id': data['id'] } self.subscribed = True if __debug__: self.logger.debug("Sending subscribe response: {}" .format(pformat(ret))) self.write_queue.put(json.dumps(ret) + "\n") elif meth == "mining.authorize": if self.subscribed is False: self._incr('not_subbed_err') self.send_error(25, id_val=data['id']) return if self.authenticated is True: self._incr('not_authed_err') self.send_error(24, id_val=data['id']) return try: password = data['params'][1] username = data['params'][0] # allow the user to use the password field as an argument field try: args = password_arg_parser.parse_args(password.split()) except ArgumentParserError: # Ignore malformed parser data pass else: if args.diff: diff = max(self.config['minimum_manual_diff'], args.diff) self.difficulty = diff self.next_diff = diff except IndexError: password = "" username = "" self.manager.log_event( "{name}.auth:1|c".format(name=self.manager.config['procname'])) self.logger.info("Authentication request from {} for username {}" .format(self.peer_name[0], username)) user_worker = self.convert_username(username) # unpack into state dictionary self.address, self.worker = user_worker self.authenticated = True self.server.set_user(self) # notify of success authing and send him current diff and latest # job self.send_success(data['id']) self.push_difficulty() self.push_job() elif meth == "mining.submit": if self.authenticated is False: self._incr('not_authed_err') self.send_error(24, id_val=data['id']) return t = time.time() diff, typ = self.submit_job(data, t) # Log the share to our stat counters key = "" if typ > 0: key += "reject_" key += StratumClient.share_type_strings[typ] + "_share" if typ == 0: # Increment valid shares to calculate hashrate self._incr(key + "_n1", diff) self.manager.log_event( "{name}.{type}:1|c\n" "{name}.{type}_n1:{diff}|c\n" "{name}.submit_time:{t}|ms" .format(name=self.manager.config['procname'], type=key, diff=diff, t=(time.time() - t) * 1000)) # don't recalc their diff more often than interval if (self.config['vardiff']['enabled'] is True and (t - self.last_diff_adj) > self.config['vardiff']['interval']): self.recalc_vardiff() elif meth == "mining.get_transactions": self.send_error(id_val=data['id']) elif meth == "mining.extranonce.subscribe": self.send_success(id_val=data['id']) else: self.logger.info("Unkown action {} for command {}" .format(data['method'][:20], self.peer_name[0])) self._incr('unk_err') self.send_error(id_val=data['id'])
@property def summary(self): """ Displayed on the all client view in the http status monitor """ return dict(worker=self.worker, idle=self.idle) @property def last_share_submit_delta(self): return datetime.datetime.utcnow() - datetime.datetime.utcfromtimestamp(self.last_share_submit) @property def details(self): """ Displayed on the single client view in the http status monitor """ return dict(alltime_accepted_shares=self.accepted_shares, difficulty=self.difficulty, type=self.client_type, worker=self.worker, id=self._id, jobmapper_size=len(self.old_job_mapper) + len(self.job_mapper), last_share_submit=str(self.last_share_submit_delta), idle=self.idle, address=self.address, ip_address=self.peer_name[0], connection_time=str(self.connection_duration))