@@ -789,6 +789,10 @@ func (conn *Connection) writer(w writeFlusher, c Conn) {
789
789
runtime .Gosched ()
790
790
if len (conn .dirtyShard ) == 0 {
791
791
if err := w .Flush (); err != nil {
792
+ err = ClientError {
793
+ ErrIoError ,
794
+ fmt .Sprintf ("failed to flush data to the connection: %s" , err ),
795
+ }
792
796
conn .reconnect (err , c )
793
797
return
794
798
}
@@ -812,6 +816,10 @@ func (conn *Connection) writer(w writeFlusher, c Conn) {
812
816
continue
813
817
}
814
818
if _ , err := w .Write (packet .b ); err != nil {
819
+ err = ClientError {
820
+ ErrIoError ,
821
+ fmt .Sprintf ("failed to write data to the connection: %s" , err ),
822
+ }
815
823
conn .reconnect (err , c )
816
824
return
817
825
}
@@ -868,12 +876,20 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
868
876
for atomic .LoadUint32 (& conn .state ) != connClosed {
869
877
respBytes , err := read (r , conn .lenbuf [:])
870
878
if err != nil {
879
+ err = ClientError {
880
+ ErrIoError ,
881
+ fmt .Sprintf ("failed to read data from the connection: %s" , err ),
882
+ }
871
883
conn .reconnect (err , c )
872
884
return
873
885
}
874
886
resp := & Response {buf : smallBuf {b : respBytes }}
875
887
err = resp .decodeHeader (conn .dec )
876
888
if err != nil {
889
+ err = ClientError {
890
+ ErrProtocolError ,
891
+ fmt .Sprintf ("failed to decode IPROTO header: %s" , err ),
892
+ }
877
893
conn .reconnect (err , c )
878
894
return
879
895
}
@@ -883,6 +899,10 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
883
899
if event , err := readWatchEvent (& resp .buf ); err == nil {
884
900
events <- event
885
901
} else {
902
+ err = ClientError {
903
+ ErrProtocolError ,
904
+ fmt .Sprintf ("failed to decode IPROTO_EVENT: %s" , err ),
905
+ }
886
906
conn .opts .Logger .Report (LogWatchEventReadFailed , conn , err )
887
907
}
888
908
continue
0 commit comments