@@ -641,7 +641,7 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann
641
641
case head
642
642
case redirected( HTTPResponseHead , URL )
643
643
case body
644
- case end
644
+ case endOrError
645
645
}
646
646
647
647
let task : HTTPClient . Task < Delegate . Response >
@@ -651,6 +651,8 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann
651
651
let logger : Logger // We are okay to store the logger here because a TaskHandler is just for one request.
652
652
653
653
var state : State = . idle
654
+ var expectedBodyLength : Int ?
655
+ var actualBodyLength : Int = 0
654
656
var pendingRead = false
655
657
var mayRead = true
656
658
var closing = false {
@@ -785,7 +787,7 @@ extension TaskHandler: ChannelDuplexHandler {
785
787
} catch {
786
788
promise? . fail ( error)
787
789
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
788
- self . state = . end
790
+ self . state = . endOrError
789
791
return
790
792
}
791
793
@@ -799,12 +801,23 @@ extension TaskHandler: ChannelDuplexHandler {
799
801
assert ( head. version == HTTPVersion ( major: 1 , minor: 1 ) ,
800
802
" Sending a request in HTTP version \( head. version) which is unsupported by the above `if` " )
801
803
804
+ let contentLengths = head. headers [ canonicalForm: " content-length " ]
805
+ assert ( contentLengths. count <= 1 )
806
+
807
+ self . expectedBodyLength = contentLengths. first. flatMap { Int ( $0) }
808
+
802
809
context. write ( wrapOutboundOut ( . head( head) ) ) . map {
803
810
self . callOutToDelegateFireAndForget ( value: head, self . delegate. didSendRequestHead)
804
811
} . flatMap {
805
812
self . writeBody ( request: request, context: context)
806
813
} . flatMap {
807
814
context. eventLoop. assertInEventLoop ( )
815
+ if let expectedBodyLength = self . expectedBodyLength, expectedBodyLength != self . actualBodyLength {
816
+ self . state = . endOrError
817
+ let error = HTTPClientError . bodyLengthMismatch
818
+ self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
819
+ return context. eventLoop. makeFailedFuture ( error)
820
+ }
808
821
return context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) )
809
822
} . map {
810
823
context. eventLoop. assertInEventLoop ( )
@@ -813,10 +826,10 @@ extension TaskHandler: ChannelDuplexHandler {
813
826
} . flatMapErrorThrowing { error in
814
827
context. eventLoop. assertInEventLoop ( )
815
828
switch self . state {
816
- case . end :
829
+ case . endOrError :
817
830
break
818
831
default :
819
- self . state = . end
832
+ self . state = . endOrError
820
833
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
821
834
}
822
835
throw error
@@ -833,9 +846,11 @@ extension TaskHandler: ChannelDuplexHandler {
833
846
let promise = self . task. eventLoop. makePromise ( of: Void . self)
834
847
// All writes have to be switched to the channel EL if channel and task ELs differ
835
848
if context. eventLoop. inEventLoop {
849
+ self . actualBodyLength += part. readableBytes
836
850
context. writeAndFlush ( self . wrapOutboundOut ( . body( part) ) , promise: promise)
837
851
} else {
838
852
context. eventLoop. execute {
853
+ self . actualBodyLength += part. readableBytes
839
854
context. writeAndFlush ( self . wrapOutboundOut ( . body( part) ) , promise: promise)
840
855
}
841
856
}
@@ -898,12 +913,12 @@ extension TaskHandler: ChannelDuplexHandler {
898
913
case . end:
899
914
switch self . state {
900
915
case . redirected( let head, let redirectURL) :
901
- self . state = . end
916
+ self . state = . endOrError
902
917
self . task. releaseAssociatedConnection ( delegateType: Delegate . self, closing: self . closing) . whenSuccess {
903
918
self . redirectHandler? . redirect ( status: head. status, to: redirectURL, promise: self . task. promise)
904
919
}
905
920
default :
906
- self . state = . end
921
+ self . state = . endOrError
907
922
self . callOutToDelegate ( promise: self . task. promise, self . delegate. didFinishRequest)
908
923
}
909
924
}
@@ -918,14 +933,14 @@ extension TaskHandler: ChannelDuplexHandler {
918
933
context. read ( )
919
934
}
920
935
case . failure( let error) :
921
- self . state = . end
936
+ self . state = . endOrError
922
937
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
923
938
}
924
939
}
925
940
926
941
func userInboundEventTriggered( context: ChannelHandlerContext , event: Any ) {
927
942
if ( event as? IdleStateHandler . IdleStateEvent) == . read {
928
- self . state = . end
943
+ self . state = . endOrError
929
944
let error = HTTPClientError . readTimeout
930
945
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
931
946
} else {
@@ -935,7 +950,7 @@ extension TaskHandler: ChannelDuplexHandler {
935
950
936
951
func triggerUserOutboundEvent( context: ChannelHandlerContext , event: Any , promise: EventLoopPromise < Void > ? ) {
937
952
if ( event as? TaskCancelEvent ) != nil {
938
- self . state = . end
953
+ self . state = . endOrError
939
954
let error = HTTPClientError . cancelled
940
955
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
941
956
promise? . succeed ( ( ) )
@@ -946,10 +961,10 @@ extension TaskHandler: ChannelDuplexHandler {
946
961
947
962
func channelInactive( context: ChannelHandlerContext ) {
948
963
switch self . state {
949
- case . end :
964
+ case . endOrError :
950
965
break
951
966
case . body, . head, . idle, . redirected, . sent:
952
- self . state = . end
967
+ self . state = . endOrError
953
968
let error = HTTPClientError . remoteConnectionClosed
954
969
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
955
970
}
@@ -960,7 +975,7 @@ extension TaskHandler: ChannelDuplexHandler {
960
975
switch error {
961
976
case NIOSSLError . uncleanShutdown:
962
977
switch self . state {
963
- case . end :
978
+ case . endOrError :
964
979
/// Some HTTP Servers can 'forget' to respond with CloseNotify when client is closing connection,
965
980
/// this could lead to incomplete SSL shutdown. But since request is already processed, we can ignore this error.
966
981
break
@@ -969,11 +984,11 @@ extension TaskHandler: ChannelDuplexHandler {
969
984
/// We can also ignore this error like `.end`.
970
985
break
971
986
default :
972
- self . state = . end
987
+ self . state = . endOrError
973
988
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
974
989
}
975
990
default :
976
- self . state = . end
991
+ self . state = . endOrError
977
992
self . failTaskAndNotifyDelegate ( error: error, self . delegate. didReceiveError)
978
993
}
979
994
}
0 commit comments