2016-02-11 2 views
0

Я пытаюсь использовать новую регистрацию api из скрученной для отправки журналов на сервер старта.Вход в logstash с Twisted

Без скручены, вот что я использую:

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 

from __future__ import division, absolute_import, \ 
    print_function, unicode_literals 

import json 
import logging 
import logstash 

logger = logging.getLogger('python-logstash-logger') 
logger.setLevel(logging.DEBUG) 
logger.addHandler(logstash.TCPLogstashHandler(b'127.0.0.1', 5001, version=1)) 
logger.addHandler(logging.StreamHandler()) 
a = dict(a=1, source='testing', _id='Test') 
logger.debug(json.dumps(a)) 

Я не understant, как реализовать этот код в скрученном философии, я думаю, что я должен реализовать наблюдателя, который направляет журналы на сервер logstash , но у меня возникают проблемы с поиском каких-либо примеров того, как достичь этого.

Я сделал простую попытку, которая не работает, если кто-то может мне точку в правильном направлении:

from logging.config import dictConfig 
from twisted.logger import Logger, STDLibLogObserver 

from txacme.application import app 

dictConfig(app.settings.get(path='txacme.log')) 

l = Logger(observer=STDLibLogObserver('txacme')) 
data = dict(a=1, source='coucou', _id='Test') 
l.info(data) 
l.error('Hello') 

и конфигурации я использую:

txacme: 
    log: 
     version: 1 
     disable_existing_loggers: False 

     formatters: 
      minimal: 
      format: '[%(asctime)-15s] %(message)s' 
      simple: 
      format: '[%(asctime)-15s][%(levelname)s][%(module)s][%(funcName)s] %(message)s' 

     handlers: 
      console: 
      class: logging.StreamHandler 
      level: !!python/name:logging.DEBUG 
      formatter: simple 
      stream: ext://sys.stdout 

      logstash_tcp: &LOGSTASH 
      class: logstash.TCPLogstashHandler 
      level: !!python/name:logging.DEBUG 
      version: 1 
      host: 127.0.0.1 
      port: 5001 
      message_type: acme 
      tags: [acme, prod] 

     loggers: 
      txacme: &acme 
      level: !!python/name:logging.DEBUG 
      handlers: [logstash_tcp] 
      propagate: no 

Я также нашел пример consumer for RabbitMQ, но я не уверен, как переносить это на logstash.

ответ

1

я наконец закончил с созданием протокола/фабрик для подключения к logstash удаленного сервера и LogObserver, который передает данные в асинхронном режиме (огнь и забыть):

Организация код вдохновлен из twisted.logger пакета, это может быть использована следующим образом:

settings = { 
    'host': 10.68.0.41, 
    'port': 5001, 
    'version': 1} 
logstashObserver = LogstashLogObserver(**settings) 

__init__.py

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 

from ._logstash import LogstashLogObserver 

__all__ = (
    # From ._logstash 
    'LogstashLogObserver', 
) 

_logstash.py

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 

from __future__ import absolute_import 
from __future__ import division 
from __future__ import print_function 

import inspect 

from twisted import logger 
from twisted.internet import defer 
from twisted.internet import endpoints 
from twisted.internet import protocol 
from twisted.internet import reactor 
from twisted.internet import task 
from twisted.protocols import basic 
from zope import interface 

from . import _formatter 


# use the json event formatter from the twisted logging system 
def _formatEvent(event): 
    l = logger.jsonFileLogObserver(None, u'') 
    return l.formatEvent(event).strip() 


class LogstashClient(basic.LineReceiver): 
    def connectionMade(self): 
     try: 
      self.transport.setTcpKeepAlive(1) 
     except AttributeError: 
      pass 

    def emit(self, event): 
     self.sendLine(event) 
     self.transport.loseConnection() 
     self.factory.eventEmitted(event) 


class LogstashFactory(protocol.ReconnectingClientFactory): 
    protocol = LogstashClient 

    def __init__(self): 
     self.clientRequests = [] 
     self.eventRequests = [] 
     self.connected = False 

    def connectionMade(self, protocol): 
     ds = self.clientRequests 
     self.clientRequests = [] 
     for d in ds: 
      d.callback(protocol) 

    def eventEmitted(self, event): 
     ds = self.eventRequests 
     self.eventRequests = [] 
     for d in ds: 
      d.callback(event) 


# start the logging factory once, it will reconnect automatically 
_factory = LogstashFactory() 


@interface.implementer(logger.ILogObserver) 
class LogstashLogObserver(object): 
    def __init__(self, host, port=5959, prefix=None, message_type='logstash', 
       tags=None, fqdn=False, version=0): 
     self.host = host 
     self.port = port 
     if version == 1: 
      formatter = _formatter.LogstashFormatterVersion1 
     else: 
      formatter = _formatter.LogstashFormatterVersion0 
     self.formatter = formatter(prefix, message_type, tags, fqdn) 

    def __call__(self, event): 
     # log_ prefix is the one used by twisted, risk of collision... 
     # see https://twistedmatrix.com/documents/15.2.1/core/howto/logger.html 
     event['log_stack'] = inspect.stack() 
     event = self.formatter.format(event) 
     d = task.deferLater(reactor, 0, self._connect, reactor) 
     d.addCallback(lambda client, event: client.emit(event), event) 

    def _connect(self, reactor=None): 
     if reactor is None: 
      from twisted.internet import reactor 

     endpoint = endpoints.TCP4ClientEndpoint(reactor, self.host, self.port) 
     return endpoint.connect(_factory) 

_formatter.py

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 

from __future__ import absolute_import 
from __future__ import division 
from __future__ import print_function 


import logging 
import socket 
import sys 
from datetime import datetime 

from twisted import logger 

try: 
    import json 
except ImportError: 
    import simplejson as json 


class LogstashBaseFormatter(logging.Formatter): 
    def __init__(self, prefix=None, message_type='Logstash', tags=None, fqdn=False): 
     self.prefix = prefix 
     self.message_type = message_type 
     self.tags = tags if tags is not None else [] 

     if fqdn: 
      self.host = socket.getfqdn() 
     else: 
      self.host = socket.gethostname() 

    def get_debug_fields(self, record): 
     failure = record['log_failure'] 

     try: 
      traceback = failure.getTraceback() 
     except Exception: 
      traceback = u"(UNABLE TO OBTAIN TRACEBACK FROM EVENT)\n" 

     innermost_frame = failure.frames.pop(0) 
     fields = { 
      'type': str(failure.type), 
      'module': innermost_frame[0], 
      'file': innermost_frame[1], 
      'lineno': innermost_frame[2], 
      'stack': failure.stack, 
      'parents': failure.parents, 
      'traceback': traceback, 
     } 

     return fields 

    def get_extra_fields(self, record): 
     fields = {} 
     if sys.version_info < (3, 0): 
      easy_types = (basestring, bool, dict, float, int, long, list, type(None)) 
     else: 
      easy_types = (str, bool, dict, float, int, list, type(None)) 

     if self.prefix is not None: 
      for key, value in record: 
       if not key.startswith(self.prefix): continue 
       if isinstance(value, easy_types): 
        fields[key] = value 
       else: 
        fields[key] = repr(value) 
     else: 

      # get every field that isn't prefixed with log_ 
      for key, value in record.items(): 
       if key.startswith('log_'): continue 
       if isinstance(value, easy_types): 
        fields[key] = value 
       else: 
        fields[key] = repr(value) 

     return fields 

    @classmethod 
    def format_source(cls, message_type, host, path): 
     return "%s://%s/%s" % (message_type, host, path) 

    @classmethod 
    def format_timestamp(cls, time): 
     tstamp = datetime.utcfromtimestamp(time) 
     return tstamp.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (tstamp.microsecond/1000) + "Z" 

    @classmethod 
    def get_namespace(cls, record): 
     if 'log_namespace' in record: 
      namespace = record['log_namespace'] 
     elif 'log_logger' in record: 
      namespace = record['log_logger'].namespace 
     else: 
      namespace = '(UNABLE TO OBTAIN THE NAMESPACE)' 
     return namespace 

    @classmethod 
    def serialize(cls, message): 
     if sys.version_info < (3, 0): 
      return json.dumps(message) 
     else: 
      return bytes(json.dumps(message), 'utf-8') 


class LogstashFormatterVersion0(LogstashBaseFormatter): 
    version = 0 

    def format(self, record): 
     # Create message dict 
     message = { 
      '@timestamp': self.format_timestamp(record['log_time']), 
      '@version': self.version, 
      'message': logger.formatEvent(record), 
      '@source': self.format_source(self.message_type, self.host, 
              record['stack'][-1][1]), 
      '@source_host': self.host, 
      '@source_path': record['log_stack'][-1][1], 
      '@tags': self.tags, 
      '@type': self.message_type, 
      '@fields': { 
       'levelname': record['log_level'].name, 
       'logger': self.get_namespace(record), 
      }, 
     } 

     # extra fields 
     message['@fields'].update(self.get_extra_fields(record)) 

     # exception infos 
     if 'log_failure' in record: 
      message['@fields'].update(self.get_debug_fields(record)) 

     return self.serialize(message) 


class LogstashFormatterVersion1(LogstashBaseFormatter): 
    version = 1 

    def format(self, record): 
     # Create message dict 
     message = { 
      '@timestamp': self.format_timestamp(record['log_time']), 
      '@version': self.version, 
      'message': logger.formatEvent(record), 
      'host': self.host, 
      'path': record['log_stack'][-1][1], 
      'tags': self.tags, 
      'type': self.message_type, 
      'levelname': record['log_level'].name, 
      'logger': self.get_namespace(record), 
     } 

     # extra fields 
     message.update(self.get_extra_fields(record)) 

     # exception infos 
     if 'log_failure' in record: 
      message.update(self.get_debug_fields(record)) 

     return self.serialize(message) 
+0

от anychance вы хотите pubilsh этот код на PyPI? – tardyp

Смежные вопросы