From 1e35527667ae1897340cd14be42478ad6ba38b50 Mon Sep 17 00:00:00 2001 From: rachelc Date: Sun, 23 Oct 2022 13:26:43 +0300 Subject: [PATCH 1/2] support_flow_sample_expanded_and_counter_sample_expanded --- sflow/decoder.go | 22 ++++++++++++++++++---- sflow/flow_counter.go | 34 ++++++++++++++++++++++------------ sflow/flow_sample.go | 28 ++++++++++++++++++++-------- 3 files changed, 60 insertions(+), 24 deletions(-) diff --git a/sflow/decoder.go b/sflow/decoder.go index fc34b59e..1944eb3e 100644 --- a/sflow/decoder.go +++ b/sflow/decoder.go @@ -33,9 +33,11 @@ import ( const ( // DataFlowSample defines packet flow sampling DataFlowSample = 1 + DataFlowSampleExpanded = 3 // DataCounterSample defines counter sampling DataCounterSample = 2 + DataCounterSampleExpanded = 4 ) // SFDecoder represents sFlow decoder @@ -114,17 +116,29 @@ func (d *SFDecoder) SFDecode() (*SFDatagram, error) { switch sfTypeFormat { case DataFlowSample: - d, err := decodeFlowSample(d.reader) + f, err := decodeFlowSample(d.reader, false) if err != nil { return datagram, err } - datagram.Samples = append(datagram.Samples, d) + datagram.Samples = append(datagram.Samples, f) case DataCounterSample: - d, err := decodeFlowCounter(d.reader) + f, err := decodeFlowCounter(d.reader, false) if err != nil { return datagram, err } - datagram.Counters = append(datagram.Counters, d) + datagram.Counters = append(datagram.Counters, f) + case DataFlowSampleExpanded: + f, err := decodeFlowSample(d.reader, true) + if err != nil { + return datagram, err + } + datagram.Samples = append(datagram.Samples, f) + case DataCounterSampleExpanded: + f, err := decodeFlowCounter(d.reader, true) + if err != nil { + return datagram, err + } + datagram.Counters = append(datagram.Counters, f) default: d.reader.Seek(int64(sfDataLength), 1) } diff --git a/sflow/flow_counter.go b/sflow/flow_counter.go index eeb172e6..c338dc55 100644 --- a/sflow/flow_counter.go +++ b/sflow/flow_counter.go @@ -146,13 +146,13 @@ type ProcessorCounters struct { // CounterSample represents the periodic sampling or polling of counters associated with a Data Source type CounterSample struct { SequenceNo uint32 - SourceIDType byte + SourceIDType uint32 SourceIDIdx uint32 RecordsNo uint32 Records map[string]Record } -func decodeFlowCounter(r io.ReadSeeker) (*CounterSample, error) { +func decodeFlowCounter(r io.ReadSeeker, expanded bool) (*CounterSample, error) { var ( cs = new(CounterSample) rTypeFormat uint32 @@ -160,7 +160,7 @@ func decodeFlowCounter(r io.ReadSeeker) (*CounterSample, error) { err error ) - if err = cs.unmarshal(r); err != nil { + if err = cs.unmarshal(r, expanded); err != nil { return nil, err } @@ -441,7 +441,7 @@ func (pc *ProcessorCounters) unmarshal(r io.Reader) error { return nil } -func (cs *CounterSample) unmarshal(r io.Reader) error { +func (cs *CounterSample) unmarshal(r io.Reader, expanded bool) error { var err error @@ -449,15 +449,25 @@ func (cs *CounterSample) unmarshal(r io.Reader) error { return err } - if err = read(r, &cs.SourceIDType); err != nil { - return err - } - - buf := make([]byte, 3) - if err = read(r, &buf); err != nil { - return err + if expanded { + if err = read(r, &cs.SourceIDType); err != nil { + return err + } + if err = read(r, &cs.SourceIDIdx); err != nil { + return err + } + } else { + buf := make([]byte, 1) + if err = read(r, &buf); err != nil { + return err + } + cs.SourceIDType = uint32(buf[0]) + buf = make([]byte, 3) + if err = read(r, &buf); err != nil { + return err + } + cs.SourceIDIdx = uint32(buf[2]) | uint32(buf[1])<<8 | uint32(buf[0])<<16 } - cs.SourceIDIdx = uint32(buf[2]) | uint32(buf[1])<<8 | uint32(buf[0])<<16 err = read(r, &cs.RecordsNo) diff --git a/sflow/flow_sample.go b/sflow/flow_sample.go index 0a284673..232bc909 100644 --- a/sflow/flow_sample.go +++ b/sflow/flow_sample.go @@ -44,7 +44,7 @@ const ( // FlowSample represents single flow sample type FlowSample struct { SequenceNo uint32 // Incremented with each flow sample - SourceID byte // sfSourceID + SourceID uint32 // sfSourceID SamplingRate uint32 // sfPacketSamplingRate SamplePool uint32 // Total number of packets that could have been sampled Drops uint32 // Number of times a packet was dropped due to lack of resources @@ -82,19 +82,27 @@ var ( errMaxOutEthernetLength = errors.New("the ethernet length is greater than 1500") ) -func (fs *FlowSample) unmarshal(r io.ReadSeeker) error { +func (fs *FlowSample) unmarshal(r io.ReadSeeker, expanded bool) error { var err error if err = read(r, &fs.SequenceNo); err != nil { return err } - if err = read(r, &fs.SourceID); err != nil { - return err + if expanded { + if err = read(r, &fs.SourceID); err != nil { + return err + } + r.Seek(4, 1) // skip counter sample decoding + } else { + buf := make([]byte, 1) + if err = read(r, &buf); err != nil { + return err + } + fs.SourceID = uint32(buf[0]) + r.Seek(3, 1) // skip counter sample decoding } - r.Seek(3, 1) // skip counter sample decoding - if err = read(r, &fs.SamplingRate); err != nil { return err } @@ -107,10 +115,14 @@ func (fs *FlowSample) unmarshal(r io.ReadSeeker) error { return err } + if expanded { r.Seek(4, 1) } // skip Input interface format + if err = read(r, &fs.Input); err != nil { return err } + if expanded { r.Seek(4, 1) } // skip Output interface format + if err = read(r, &fs.Output); err != nil { return err } @@ -197,7 +209,7 @@ func (er *ExtRouterData) unmarshal(r io.Reader, l uint32) error { return err } -func decodeFlowSample(r io.ReadSeeker) (*FlowSample, error) { +func decodeFlowSample(r io.ReadSeeker, expanded bool) (*FlowSample, error) { var ( fs = new(FlowSample) rTypeFormat uint32 @@ -205,7 +217,7 @@ func decodeFlowSample(r io.ReadSeeker) (*FlowSample, error) { err error ) - if err = fs.unmarshal(r); err != nil { + if err = fs.unmarshal(r, expanded); err != nil { return nil, err } From 17b79d026a15662021b07d4a7e7e1c47f26750ed Mon Sep 17 00:00:00 2001 From: rachelc Date: Mon, 24 Oct 2022 10:46:08 +0300 Subject: [PATCH 2/2] support_flow_sample_expanded_and_counter_sample_expanded --- sflow/decoder.go | 16 ++--- sflow/flow_sample.go | 148 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 138 insertions(+), 26 deletions(-) diff --git a/sflow/decoder.go b/sflow/decoder.go index 1944eb3e..4930704d 100644 --- a/sflow/decoder.go +++ b/sflow/decoder.go @@ -116,29 +116,29 @@ func (d *SFDecoder) SFDecode() (*SFDatagram, error) { switch sfTypeFormat { case DataFlowSample: - f, err := decodeFlowSample(d.reader, false) + d, err := decodeFlowSample(d.reader) if err != nil { return datagram, err } - datagram.Samples = append(datagram.Samples, f) + datagram.Samples = append(datagram.Samples, d) case DataCounterSample: - f, err := decodeFlowCounter(d.reader, false) + d, err := decodeFlowCounter(d.reader, false) if err != nil { return datagram, err } - datagram.Counters = append(datagram.Counters, f) + datagram.Counters = append(datagram.Counters, d) case DataFlowSampleExpanded: - f, err := decodeFlowSample(d.reader, true) + d, err := decodeFlowSampleExpand(d.reader) if err != nil { return datagram, err } - datagram.Samples = append(datagram.Samples, f) + datagram.Samples = append(datagram.Samples, d) case DataCounterSampleExpanded: - f, err := decodeFlowCounter(d.reader, true) + d, err := decodeFlowCounter(d.reader, true) if err != nil { return datagram, err } - datagram.Counters = append(datagram.Counters, f) + datagram.Counters = append(datagram.Counters, d) default: d.reader.Seek(int64(sfDataLength), 1) } diff --git a/sflow/flow_sample.go b/sflow/flow_sample.go index 232bc909..bfe47b17 100644 --- a/sflow/flow_sample.go +++ b/sflow/flow_sample.go @@ -54,6 +54,29 @@ type FlowSample struct { Records map[string]Record } +type sflowDataSourceExpand struct{ + sourceIdType uint32; /* sFlowDataSource type */ + sourceIdIndex uint32; /* sFlowDataSource index */ +} + +type interfaceExpand struct{ + format uint32; /* interface format */ + value uint32; /* interface value */ +} + +// FlowSampleExpand represents single flow sample expand +type FlowSampleExpand struct { + SequenceNo uint32 // Incremented with each flow sample + SourceID sflowDataSourceExpand // sfSourceID + SamplingRate uint32 // sfPacketSamplingRate + SamplePool uint32 // Total number of packets that could have been sampled + Drops uint32 // Number of times a packet was dropped due to lack of resources + Input interfaceExpand // SNMP ifIndex of input interface + Output interfaceExpand // SNMP ifIndex of input interface + RecordsNo uint32 // Number of records to follow + Records map[string]Record +} + // SampledHeader represents sampled header type SampledHeader struct { Protocol uint32 // (enum SFLHeader_protocol) @@ -82,26 +105,20 @@ var ( errMaxOutEthernetLength = errors.New("the ethernet length is greater than 1500") ) -func (fs *FlowSample) unmarshal(r io.ReadSeeker, expanded bool) error { +func (fs *FlowSample) unmarshal(r io.ReadSeeker) error { var err error if err = read(r, &fs.SequenceNo); err != nil { return err } - if expanded { - if err = read(r, &fs.SourceID); err != nil { - return err - } - r.Seek(4, 1) // skip counter sample decoding - } else { - buf := make([]byte, 1) - if err = read(r, &buf); err != nil { + buf := make([]byte, 1) + if err = read(r, &buf); err != nil { return err - } - fs.SourceID = uint32(buf[0]) - r.Seek(3, 1) // skip counter sample decoding } + fs.SourceID = uint32(buf[0]) + r.Seek(3, 1) // skip unused bytes + if err = read(r, &fs.SamplingRate); err != nil { return err @@ -115,14 +132,10 @@ func (fs *FlowSample) unmarshal(r io.ReadSeeker, expanded bool) error { return err } - if expanded { r.Seek(4, 1) } // skip Input interface format - if err = read(r, &fs.Input); err != nil { return err } - if expanded { r.Seek(4, 1) } // skip Output interface format - if err = read(r, &fs.Output); err != nil { return err } @@ -132,6 +145,54 @@ func (fs *FlowSample) unmarshal(r io.ReadSeeker, expanded bool) error { return err } +func (fs *FlowSampleExpand) unmarshalExpand(r io.ReadSeeker) error { + var err error + + if err = read(r, &fs.SequenceNo); err != nil { + return err + } + + if err = read(r, &fs.SourceID.sourceIdType); err != nil { + return err + } + + if err = read(r, &fs.SourceID.sourceIdType); err != nil { + return err + } + + if err = read(r, &fs.SamplingRate); err != nil { + return err + } + + if err = read(r, &fs.SamplePool); err != nil { + return err + } + + if err = read(r, &fs.Drops); err != nil { + return err + } + + if err = read(r, &fs.Input.format); err != nil { + return err + } + + if err = read(r, &fs.Input.value); err != nil { + return err + } + + if err = read(r, &fs.Output.format); err != nil { + return err + } + + if err = read(r, &fs.Output.value); err != nil { + return err + } + + err = read(r, &fs.RecordsNo) + + return err +} + func (sh *SampledHeader) unmarshal(r io.Reader) error { var err error @@ -209,7 +270,7 @@ func (er *ExtRouterData) unmarshal(r io.Reader, l uint32) error { return err } -func decodeFlowSample(r io.ReadSeeker, expanded bool) (*FlowSample, error) { +func decodeFlowSample(r io.ReadSeeker) (*FlowSample, error) { var ( fs = new(FlowSample) rTypeFormat uint32 @@ -217,7 +278,58 @@ func decodeFlowSample(r io.ReadSeeker, expanded bool) (*FlowSample, error) { err error ) - if err = fs.unmarshal(r, expanded); err != nil { + if err = fs.unmarshal(r); err != nil { + return nil, err + } + + fs.Records = make(map[string]Record) + + for i := uint32(0); i < fs.RecordsNo; i++ { + if err = read(r, &rTypeFormat); err != nil { + return nil, err + } + if err = read(r, &rTypeLength); err != nil { + return nil, err + } + + switch rTypeFormat { + case SFDataRawHeader: + d, err := decodeSampledHeader(r) + if err != nil { + return fs, err + } + fs.Records["RawHeader"] = d + case SFDataExtSwitch: + d, err := decodeExtSwitchData(r) + if err != nil { + return fs, err + } + + fs.Records["ExtSwitch"] = d + case SFDataExtRouter: + d, err := decodeExtRouterData(r, rTypeLength) + if err != nil { + return fs, err + } + + fs.Records["ExtRouter"] = d + default: + r.Seek(int64(rTypeLength), 1) + } + } + + return fs, nil +} + +func decodeFlowSampleExpand(r io.ReadSeeker) (*FlowSampleExpand, error) { + var ( + fs = new(FlowSampleExpand) + rTypeFormat uint32 + rTypeLength uint32 + err error + ) + + if err = fs.unmarshalExpand(r); err != nil { return nil, err }