Мы собираем журналы CISE с наших устройств Cisco. Сообщение поступает в несколько частей. Может ли logstash повторно собрать их?Сообщение Syslog в нескольких частях Может ли logstash собрать его?
"message" => "<181>Dec 13 20:41:35 sfm-ise-psn1 CISE_Passed_Authentications 0069245712 3 1 IdentityGroup=Endpoint Identity Groups:SFDC-Assets:SFDC-AccessPoints, Step=11001, Step=11017, Step=11027, Step=15049, Step=15008, Step=15048, Step=15048, Step=15004, Step=15041, Step=15006, Step=15013, Step=24209, Step=24211, Step=22037, Step=15036, Step=15048, Step=15048, Step=15048, Step=15004, Step=15016, Step=11022, Step=11002, SelectedAuthenticationIdentityStores=Internal Endpoints, NetworkDeviceGroups=Stage#Stage#Low Impact Mode, NetworkDeviceGroups=Location#All Locations#SFDC#SFDC-Americas#SFDC-Americas-East#NY-New York NEW, NetworkDeviceGroups=Device Type#All Device Types#Switching, AuthorizationPolicyMatchedRule=SFDC Access Points, UserType=Host, CPMSessionID=000000000000000200016F9F, EndPointMACAddress=64-9E-F3-B3-5C-15, PostureAssessmentStatus=NotApplicable, EndPointMatchedProfile=Cisco-AP-Aironet-1140, DeviceRegistrationStatus=notRegistered, ISEPolicySetName=Default, AllowedProtocolMatchedRule=MAB,",
"message" => "<181>Dec 13 20:41:35 sfm-ise-psn1 CISE_Passed_Authentications 0069245712 3 2 IdentitySelectionMatchedRule=Default, HostIdentityGroup=Endpoint Identity Groups:SFDC-Assets:SFDC-AccessPoints, Stage=Stage#Stage#Low Impact Mode, Location=Location#All Locations#SFDC#SFDC-Americas#SFDC-Americas-East#NY-New York NEW, Device Type=Device Type#All Device Types#Switching, PostureStatus=Unknown, Response={UserName=64:9E:F3:B3:5C:15; User-Name=64-9E-F3-B3-5C-15; State=ReauthSession:000000000000000200016F9F; Class=CACS:000000000000000200016F9F:sfm-ise-psn1/228424214/80382355; Session-Timeout=14400; Termination-Action=RADIUS-Request; cisco-av-pair=ACS:CiscoSecure-Defined-ACL=#ACSACL#-IP-PERMIT_ALL_TRAFFIC-5165e13c; cisco-av-pair=profile-name=Cisco-AP-Aironet-1140; },",
"message" => "<181>Dec 13 20:41:35 sfm-ise-psn1 CISE_Passed_Authentications 0069245712 3 0 2015-12-13 20:41:35.925 +00:00 9534852286 5200 NOTICE Passed-Authentication: Authentication succeeded, ConfigVersionId=61, Device IP Address=10.119.68.254, DestinationIPAddress=10.1.64.13, DestinationPort=1812, UserName=64-9E-F3-B3-5C-15, Protocol=Radius, RequestLatency=7, NetworkDeviceName=Melville-3750_copy, User-Name=649ef3b35c15, NAS-IP-Address=10.119.68.254, NAS-Port=50122, Service-Type=Call Check, Framed-IP-Address=10.119.68.3, Framed-MTU=1500, Called-Station-ID=28-94-0F-34-AC-16, Calling-Station-ID=64-9E-F3-B3-5C-15, NAS-Port-Type=Ethernet, NAS-Port-Id=GigabitEthernet1/0/22, EAP-Key-Name=, cisco-av-pair=audit-session-id=000000000000000200016F9F, OriginalUserName=649ef3b35c15, AcsSessionID=sfm-ise-psn1/228424214/80382355, AuthenticationIdentityStore=Internal Endpoints, AuthenticationMethod=Lookup, SelectedAccessService=Default Network Access, SelectedAuthorizationProfiles=PermitAccess-SFDC-AP, UseCase=Host Lookup,",
Посмотрев на Агрегатный плагин, я чувствую, что приближаюсь. Сообщение еще не собирается в виде одной записи.
if [message] =~ /^<181>/ {
grok {
match => { "message" => "%{SYSLOG5424PRI:pri}%{CISCOTIMESTAMP:time} %{IPORHOST:hostname} %{WORD:type} %{INT:taskid} %{INT:duration:int} %{INT:order:int} "}
}
if [order] == "0" {
aggregate {
task_id => "%{taskid}"
code => "map['sql_duration'] ||= 0 ; map['sql_duration'] += event['duration']"
}
}
if [order] == "%{duration}" {
aggregate {
task_id => "%{taskid}"
code => "event['sql_duration'] = map['sql_duration']"
end_of_task => true
timeout => 120
}
}
kv {
type => syslog
add_field => { "log_type" => "CISE" }
}
}
Возможно, это будет возможно с помощью фильтра «aggregate»: https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html –