Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions rekcurd/core/rekcurd_dashboard_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ def EvaluateModel(self,
result_path = first_req.result_path

local_data_path = self.rekcurd_pack.app.data_server.get_evaluation_data_path(data_path)
result, details = self.rekcurd_pack.app.evaluate(self.rekcurd_pack.predictor, local_data_path)
evaluate_result_gen = self.rekcurd_pack.app.evaluate(self.rekcurd_pack.predictor, local_data_path)
result = self.rekcurd_pack.app.data_server.upload_evaluation_result(evaluate_result_gen, result_path)
label_ios = [self.get_io_by_type(l) for l in result.label]
metrics = rekcurd_pb2.EvaluationMetrics(num=result.num,
accuracy=result.accuracy,
Expand All @@ -168,8 +169,6 @@ def EvaluateModel(self,
fvalue=result.fvalue,
option=result.option,
label=label_ios)
self.rekcurd_pack.app.data_server.upload_evaluation_result_summary(result, result_path)
self.rekcurd_pack.app.data_server.upload_evaluation_result_detail(details, result_path)
return rekcurd_pb2.EvaluateModelResponse(metrics=metrics)

@error_handling(rekcurd_pb2.UploadEvaluationDataResponse(status=0, message='Error: Uploading evaluation data.'))
Expand Down Expand Up @@ -202,8 +201,6 @@ def EvaluationResult(self,
local_result_summary_path = self.rekcurd_pack.app.data_server.get_eval_result_summary(result_path)
local_result_detail_path = self.rekcurd_pack.app.data_server.get_eval_result_detail(result_path)

with open(local_result_detail_path, 'rb') as f:
result_details = pickle.load(f)
with open(local_result_summary_path, 'rb') as f:
result = pickle.load(f)
label_ios = [self.get_io_by_type(l) for l in result.label]
Expand All @@ -218,21 +215,28 @@ def EvaluationResult(self,
detail_chunks = []
detail_chunk = []
metrics_size = sys.getsizeof(metrics)
for detail in self.rekcurd_pack.app.get_evaluate_detail(local_data_path, result_details):
detail_chunk.append(rekcurd_pb2.EvaluationResultResponse.Detail(
input=self.get_io_by_type(detail.input),
label=self.get_io_by_type(detail.label),
output=self.get_io_by_type(detail.result.result.label),
score=self.get_score_by_type(detail.result.result.score),
is_correct=detail.result.is_correct
))
if len(detail_chunk) == self.CHUNK_SIZE:
if metrics_size + sys.getsizeof(detail_chunk + detail_chunks) < self.BYTE_LIMIT:
detail_chunks.extend(detail_chunk)
else:
yield rekcurd_pb2.EvaluationResultResponse(metrics=metrics, detail=detail_chunks)
detail_chunks = detail_chunk
detail_chunk = []
with open(local_result_detail_path, 'rb') as detail_file:
def generate_result_detail():
try:
while True:
yield pickle.load(detail_file)
except EOFError:
pass
for detail in self.rekcurd_pack.app.get_evaluate_detail(local_data_path, generate_result_detail()):
detail_chunk.append(rekcurd_pb2.EvaluationResultResponse.Detail(
input=self.get_io_by_type(detail.input),
label=self.get_io_by_type(detail.label),
output=self.get_io_by_type(detail.result.result.label),
score=self.get_score_by_type(detail.result.result.score),
is_correct=detail.result.is_correct
))
if len(detail_chunk) == self.CHUNK_SIZE:
if metrics_size + sys.getsizeof(detail_chunk + detail_chunks) < self.BYTE_LIMIT:
detail_chunks.extend(detail_chunk)
else:
yield rekcurd_pb2.EvaluationResultResponse(metrics=metrics, detail=detail_chunks)
detail_chunks = detail_chunk
detail_chunk = []

if len(detail_chunks + detail_chunk) > 0:
if metrics_size + sys.getsizeof(detail_chunk + detail_chunks) < self.BYTE_LIMIT:
Expand Down
10 changes: 5 additions & 5 deletions rekcurd/core/rekcurd_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from abc import ABCMeta, abstractmethod
from enum import Enum
from typing import List, Tuple, Generator
from typing import Generator

from rekcurd.utils import RekcurdConfig, PredictInput, PredictResult, EvaluateResult, EvaluateDetail, EvaluateResultDetail
from rekcurd.logger import SystemLoggerInterface, ServiceLoggerInterface, JsonSystemLogger, JsonServiceLogger
Expand Down Expand Up @@ -44,7 +44,7 @@ def predict(self, predictor: object, idata: PredictInput, option: dict = None) -
raise NotImplemented()

@abstractmethod
def evaluate(self, predictor: object, filepath: str) -> Tuple[EvaluateResult, List[EvaluateResultDetail]]:
def evaluate(self, predictor: object, filepath: str) -> Generator[EvaluateResultDetail, None, EvaluateResult]:
"""
evaluate
:param predictor: Your ML predictor object. object
Expand All @@ -56,18 +56,18 @@ def evaluate(self, predictor: object, filepath: str) -> Tuple[EvaluateResult, Li
result.recall: Recall. arr[float]
result.fvalue: F1 value. arr[float]
result.option: Optional metrics. dict[str, float]
:return detail[]: Detail result of each prediction. List[EvaluateResultDetail]
:generate detail[]: Detail result of each prediction. List[EvaluateResultDetail]
detail[].result: Prediction result. PredictResult
detail[].is_correct: Prediction result is correct or not. bool
"""
raise NotImplemented()

@abstractmethod
def get_evaluate_detail(self, filepath: str, details: List[EvaluateResultDetail]) -> Generator[EvaluateDetail, None, None]:
def get_evaluate_detail(self, filepath: str, details: Generator[EvaluateResultDetail, None, None]) -> Generator[EvaluateDetail, None, None]:
"""
get_evaluate_detail
:param filepath: Evaluation data file path. str
:param details: Detail result of each prediction. List[EvaluateResultDetail]
:param details: Detail result of each prediction. Generator[EvaluateResultDetail, None, None]
:return rtn: Return results. Generator[EvaluateDetail, None, None]
rtn.input: Input data. PredictInput, one of string/bytes/arr[int]/arr[float]/arr[string]
rtn.label: Predict label. PredictLabel, one of string/bytes/arr[int]/arr[float]/arr[string]
Expand Down
35 changes: 21 additions & 14 deletions rekcurd/data_servers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import pickle

from pathlib import Path
from typing import Iterator
from typing import Iterator, Generator

from rekcurd.protobuf import rekcurd_pb2
from rekcurd.utils import RekcurdConfig, ModelModeEnum
from .data_handler import DataHandler, convert_to_valid_path
from rekcurd.utils import RekcurdConfig, ModelModeEnum, EvaluateResultDetail, EvaluateResult
from .data_handler import convert_to_valid_path
from .local_handler import LocalHandler
from .ceph_handler import CephHandler
from .aws_s3_handler import AwsS3Handler
Expand Down Expand Up @@ -115,19 +115,26 @@ def upload_evaluation_data(self, request_iterator: Iterator[rekcurd_pb2.UploadEv
self._api_handler.upload(filepath, str(local_filepath))
return str(local_filepath)

def _upload_evaluation_result(self, data: object, filepath: str, suffix: str):
def upload_evaluation_result(self, data_gen: Generator[EvaluateResultDetail, None, EvaluateResult], filepath: str) -> EvaluateResult:
valid_path = convert_to_valid_path(filepath)
if filepath != str(valid_path):
raise Exception(f'Error: Invalid evaluation result file path specified -> {filepath}')
local_filepath = Path(self._api_handler.LOCAL_EVAL_DIR, valid_path.name+suffix)
local_filepath.parent.mkdir(parents=True, exist_ok=True)
with local_filepath.open(mode='wb') as f:
pickle.dump(data, f)
self._api_handler.upload(filepath+suffix, str(local_filepath))
return str(local_filepath)

def upload_evaluation_result_summary(self, data: object, filepath: str) -> str:
return self._upload_evaluation_result(data, filepath, self.__EVALUATE_RESULT)
local_dir = Path(self._api_handler.LOCAL_EVAL_DIR)
local_dir.mkdir(parents=True, exist_ok=True)

detail_local_filepath = local_dir / (valid_path.name+self.__EVALUATE_DETAIL)
with detail_local_filepath.open(mode='wb') as detail_file:
try:
while True:
pickle.dump(next(data_gen), detail_file)
except StopIteration as e:
result_local_filepath = local_dir / (valid_path.name+self.__EVALUATE_RESULT)
evaluate_result = e.value
with result_local_filepath.open(mode='wb') as f:
pickle.dump(evaluate_result, f)
self._api_handler.upload(filepath+self.__EVALUATE_RESULT, str(result_local_filepath))

self._api_handler.upload(filepath+self.__EVALUATE_DETAIL, str(detail_local_filepath))

def upload_evaluation_result_detail(self, data: object, filepath: str) -> str:
return self._upload_evaluation_result(data, filepath, self.__EVALUATE_DETAIL)
return evaluate_result
6 changes: 3 additions & 3 deletions rekcurd/template/app.py-tpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Example is available on https://github.com/rekcurd/rekcurd-example/tree/master/python/sklearn-digits


from typing import Tuple, List, Generator
from typing import Generator

from rekcurd import Rekcurd
from rekcurd.utils import PredictInput, PredictResult, EvaluateResult, EvaluateDetail, EvaluateResultDetail
Expand Down Expand Up @@ -32,7 +32,7 @@ class RekcurdAppTemplateApp(Rekcurd):
"""
pass

def evaluate(self, predictor: object, filepath: str) -> Tuple[EvaluateResult, List[EvaluateResultDetail]]:
def evaluate(self, predictor: object, filepath: str) -> Generator[EvaluateResultDetail, None, EvaluateResult]:
""" override
TODO: Implement "evaluate"
:param predictor: Your ML predictor object. object
Expand All @@ -50,7 +50,7 @@ class RekcurdAppTemplateApp(Rekcurd):
"""
pass

def get_evaluate_detail(self, filepath: str, details: List[EvaluateResultDetail]) -> Generator[EvaluateDetail, None, None]:
def get_evaluate_detail(self, filepath: str, details: Generator[EvaluateResultDetail, None, None]) -> Generator[EvaluateDetail, None, None]:
""" override
TODO: Implement "get_evaluate_detail"
:param filepath: Evaluation data file path. str
Expand Down
6 changes: 5 additions & 1 deletion test/core/test_dashboard_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,15 @@ class RekcurdWorkerServicerTest(unittest.TestCase):
"""

def setUp(self):
def gen_eval():
for d in eval_result_details:
yield d
return eval_result
app.load_config_file("./test/test-settings.yml")
app.data_server = DataServer(app.config)
app.system_logger = JsonSystemLogger(config=app.config)
app.service_logger = JsonServiceLogger(config=app.config)
app.evaluate = Mock(return_value=(eval_result, eval_result_details))
app.evaluate = Mock(return_value=gen_eval())
self._real_time = grpc_testing.strict_real_time()
self._fake_time = grpc_testing.strict_fake_time(time.time())
servicer = RekcurdDashboardServicer(RekcurdPack(app, None))
Expand Down
17 changes: 10 additions & 7 deletions test/data_servers/test_data_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest

from rekcurd.protobuf import rekcurd_pb2
from rekcurd.utils import RekcurdConfig, ModelModeEnum
from rekcurd.utils import RekcurdConfig, ModelModeEnum, EvaluateResultDetail, EvaluateResult, PredictResult
from rekcurd.data_servers import DataServer

from . import patch_predictor
Expand Down Expand Up @@ -68,12 +68,15 @@ def test_upload_evaluation_data_invalid_path(self):
self.data_server.upload_evaluation_data(request_iterator)

@patch_predictor()
def test_upload_evaluation_result_summary(self):
self.assertEqual(self.data_server.upload_evaluation_result_summary(b'hoge', "test/eval/data"), "rekcurd-eval/data_eval_res.pkl")

@patch_predictor()
def test_upload_evaluation_result_detail(self):
self.assertEqual(self.data_server.upload_evaluation_result_detail(b'hoge', "test/eval/data"), "rekcurd-eval/data_eval_detail.pkl")
def test_upload_evaluation_result(self):
eval_res = EvaluateResult(1, 0.4, [0.5], [0.5], [0.5], [0.5], ['label'])

def generate_eval():
for _ in range(2):
yield EvaluateResultDetail(result=PredictResult('label', 0.5),
is_correct=True)
return eval_res
self.assertEqual(self.data_server.upload_evaluation_result(generate_eval(), "test/eval/data"), eval_res)

@patch_predictor()
def test_upload_evaluation_result_invalid_path(self):
Expand Down