Source code for powerpool.reporters.base

import time
import gevent

from gevent import sleep
from gevent import spawn, GreenletExit
from gevent.queue import Queue
from hashlib import sha256
from binascii import hexlify

from ..lib import Component, loop
from ..utils import time_format
from ..stratum_server import StratumClient


[docs]class Reporter(Component): """ An abstract base class to document the Reporter interface. """
[docs] def agent_send(self, address, worker, typ, data, time): """ Called when valid data is recieved from a PPAgent connection. """ raise NotImplementedError
[docs] def add_block(self, address, height, total_subsidy, fees, hex_bits, hash, merged, worker, algo): """ Called when a share is submitted with a hash that is valid for the network. """ raise NotImplementedError
[docs] def log_share(self, client, diff, typ, params, job=None, header_hash=None, header=None, start=None, **kwargs): """ Logs a share to external sources for payout calculation and statistics """ #if __debug__: # self.logger.debug( # "Running log share with args {} kwargs {}" # .format((client._id, diff, typ, params), dict( # job=job, header_hash=header_hash, header=hexlify(header)))) if typ == StratumClient.VALID_SHARE: self.logger.debug("Valid share accepted from worker {}.{}!" .format(client.address, client.worker)) # Grab the raw coinbase out of the job object before gevent can # preempt to another thread and change the value. Very important! coinbase_raw = job.coinbase.raw # Some coins use POW function to do blockhash, while others use # SHA256. Allow toggling which is used if job.pow_block_hash: header_hash_raw = client.algo['module'](header)[::-1] else: header_hash_raw = sha256(sha256(header).digest()).digest()[::-1] hash_hex = hexlify(header_hash_raw) submission_threads = [] # valid network hash? if header_hash <= job.bits_target: submission_threads.append(spawn( job.found_block, coinbase_raw, client.address, client.worker, hash_hex, header, job, start)) # check each aux chain for validity for chain_id, data in job.merged_data.iteritems(): if header_hash <= data['target']: submission_threads.append(spawn( data['found_block'], client.address, client.worker, header, coinbase_raw, job, start)) for gl in gevent.iwait(submission_threads): ret = gl.value if ret: spawn(self.add_block, **gl.value) else: self.logger.error("Submission gl {} returned nothing!" .format(gl))
class StatReporter(Reporter): """ The stat reporter groups all shares into one minute chunks and reports them to allow separation of statistics reporting and payout related logging. """ defaults = dict(pool_report_configs={}, chain=1, attrs={}) gl_methods = ['_report_one_min'] def __init__(self): self._minute_slices = {} self._per_address_slices = {} def log_one_minute(self, address, worker, algo, stamp, typ, amount): """ Called to log a minutes worth of shares that have been submitted by a unique (address, worker, algo). """ raise NotImplementedError("If you're not logging the one minute chunks" "don't use the StatReporter!") def log_share(self, client, diff, typ, params, job=None, header_hash=None, header=None, **kwargs): super(StatReporter, self).log_share( client, diff, typ, params, job=job, header_hash=header_hash, header=header, **kwargs) address, worker = client.address, client.worker algo = client.algo['name'] slc_time = (int(time.time()) // 60) * 60 slc = self._minute_slices.setdefault(slc_time, {}) self._aggr_one_min(address, worker, algo, typ, diff, slc) currency = job.currency if job else "UNKNOWN" # log the share under user "pool" to allow easy/fast display of pool stats for cfg in self.config['pool_report_configs']: user = cfg['user'] pool_worker = cfg['worker_format_string'].format( algo=algo, currency=currency, server_name=self.manager.config['procname'], **self.config['attrs']) self._aggr_one_min(user, pool_worker, algo, typ, diff, slc) if cfg.get('report_merge') and job: for currency in job.merged_data: pool_worker = cfg['worker_format_string'].format( algo=algo, currency=currency, server_name=self.manager.config['procname'], **self.config['attrs']) self._aggr_one_min(user, pool_worker, algo, typ, diff, slc) # reporting for vardiff rates if typ == StratumClient.VALID_SHARE: slc = self._per_address_slices.setdefault(slc_time, {}) if address not in slc: slc[address] = diff else: slc[address] += diff def _aggr_one_min(self, address, worker, algo, typ, amount, slc): key = (address, worker, algo, typ) if key not in slc: slc[key] = amount else: slc[key] += amount def _flush_one_min(self, exit_exc=None, caller=None): self._process_minute_slices(flush=True) self.logger.info("One minute flush complete, Exit.") @loop(interval=61, precise=60, fin="_flush_one_min") def _report_one_min(self): self._process_minute_slices() def _process_minute_slices(self, flush=False): """ Goes through our internal aggregated share data structures and reports them to our external storage. If asked to flush it will report all one minute shares, otherwise it will only report minutes that have passed. """ self.logger.info("Reporting one minute shares for address/workers") t = time.time() if not flush: upper = (int(t) // 60) * 60 for stamp, data in self._minute_slices.items(): if flush or stamp < upper: for (address, worker, algo, typ), amount in data.iteritems(): self.log_one_minute(address, worker, algo, stamp, typ, amount) # XXX: GreenletExit getting raised here might cause some # double reporting! del self._minute_slices[stamp] self.logger.info("One minute shares reported in {}" .format(time_format(time.time() - t))) # Clean up old per address slices as well ten_ago = ((time.time() // 60) * 60) - 600 for stamp in self._per_address_slices.keys(): if stamp < ten_ago: del self._per_address_slices[stamp] def spm(self, address): """ Called by the client code to determine how many shares per second are currently being submitted. Automatically cleans up the times older than 10 minutes. """ mins = 0 total = 0 for stamp in self._per_address_slices.keys(): val = self._per_address_slices[stamp].get(address) if val is not None: total += val mins += 1 return total / (mins or 1) # or 1 prevents divison by zero error class QueueStatReporter(StatReporter): def _start_queue(self): self.queue = Queue() def _flush_queue(self, exit_exc=None, caller=None): sleep(1) self.logger.info("Flushing a queue of size {}" .format(self.queue.qsize())) self.queue.put(StopIteration) for item in self.queue: self._run_queue_item(item) self.logger.info("Queue flush complete, Exit.") @loop(setup='_start_queue', fin='_flush_queue') def _queue_proc(self): item = self.queue.get() if self._run_queue_item(item) == "retry": # Put it at the back of the queue for retry self.queue.put(item) sleep(1) def _run_queue_item(self, item): name, args, kwargs = item if __debug__: self.logger.debug("Queue running {} with args '{}' kwargs '{}'" .format(name, args, kwargs)) try: func = getattr(self, name, None) if func is None: raise NotImplementedError( "Item {} has been enqueued that has no valid function!" .format(name)) func(*args, **kwargs) except self.queue_exceptions as e: self.logger.error("Unable to process queue item, retrying! " "{} Name: {}; Args: {}; Kwargs: {};" .format(e, name, args, kwargs)) return "retry" except Exception: # Log any unexpected problem, but don't retry because we might # end up endlessly retrying with same failure self.logger.error("Unkown error, queue data discarded!" "Name: {}; Args: {}; Kwargs: {};" .format(name, args, kwargs), exc_info=True) def log_one_minute(self, *args, **kwargs): self.queue.put(("_queue_log_one_minute", args, kwargs)) def add_block(self, *args, **kwargs): self.queue.put(("_queue_add_block", args, kwargs)) def _queue_add_block(self, address, height, total_subsidy, fees, hex_bits, hex_hash, currency, algo, merged=False, worker=None, **kwargs): raise NotImplementedError def _queue_log_one_minute(self, address, worker, algo, stamp, typ, amount): raise NotImplementedError