docker based ci tool
Révision | 65142a1d8af111906f8c2f4fcc841cc5362977e3 |
---|---|
Taille | 4,473 octets |
l'heure | 2019-06-06 02:19:38 |
Auteur | hylom |
Message de Log | core-server: fix log extractor
|
#!/usr/bin/env python3
import os
import re
import pathlib
import time
import concurrent.futures
import json
import grpc
from concurrent import futures
import docker
from dockrun_pb2 import RunTaskRequest, RunTaskReply
from dockrun_pb2_grpc import DockRunServicer, add_DockRunServicer_to_server
from config import config as cfg
PORT = '[::]:1234'
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
OUTPUT_DIR = './results'
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
def run_task(param, task):
print("run task: '{}'".format(param["task_name"]))
status = {
"succeeded": False,
"container_status": "Running",
"user_agent": param["user_agent"],
"parameter": {},
"parameter_type": param["parameter_type"],
};
for k in param["parameter"].keys():
status["parameter"][k] = param["parameter"][k]
reporter = FileReporter(param["task_name"], status)
reporter.write_status()
run_mode = task.get("run_mode", "run");
start_epoch = time.time()
if run_mode == "run":
image = task["image"]
client = docker.from_env()
container = client.containers.run(image=image, detach=True)
container.wait(timeout=task.get("timeout", task.get("timeout", 60)))
elif run_mode == "start":
client = docker.from_env()
container = client.containers.get(task["container"])
container.start()
container.wait(timeout=task.get("timeout", task.get("timeout", 60)))
print("task done.")
#lines = []
#for line in container.logs(stream=True):
# lines.append(line)
# logs = b"".join(lines)
logs = container.logs(since=start_epoch)
print("extract logs done.")
if run_mode == "run" and task.get("auto_remove", True):
container.remove()
print("container removed.")
reporter.set_log(logs)
status["container_status"] = "Done"
reporter.write_status()
reporter.write_log()
print("done.")
return True
class FileReporter():
def __init__(self, task_name, status, log=None):
self.task_name = task_name
self.status = status
self.log = log
self.output_basedir = os.path.join(OUTPUT_DIR, task_name)
self.output_dir = self._get_output_directory()
def set_log(self, log):
self.log = log
def write_log(self):
pn = self.output_dir / "log.txt"
with pn.open(mode='wb') as f:
f.write(self.log)
def write_status(self):
pn = self.output_dir / "status.json"
with pn.open(mode='w') as f:
json.dump(self.status, f)
def _get_output_directory(self):
if not os.path.exists(self.output_basedir):
os.mkdir(self.output_basedir)
p = pathlib.Path(self.output_basedir)
rex = re.compile(r'''^\d+$''')
last_num = 0
for x in p.iterdir():
if not x.is_dir():
continue
if rex.match(x.name) and last_num < int(x.name):
last_num = int(x.name)
out_dir = p.joinpath(str(last_num + 1))
out_dir.mkdir()
return out_dir
def _exec_done(future):
result = future.result()
print("task done.")
print(result)
class DockRun(DockRunServicer):
def RunTask(self, req, context):
tasks = cfg.get("tasks", {})
task = tasks.get(req.task_name)
print("RunTask: {}".format(req.task_name))
if not task:
message = "task '{}' not found.".format(req.task_name)
print(message);
return RunTaskReply(is_succeed=False, message=message)
# generate process
param = {
"task_name": req.task_name,
"client_name": req.client_name,
"user_agent": req.user_agent,
"parameter": req.parameter,
"parameter_type": req.parameter_type,
}
print("execute task...")
future = executor.submit(run_task, param, task)
future.add_done_callback(_exec_done)
return RunTaskReply(is_succeed=True, message="")
def main():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
add_DockRunServicer_to_server(
DockRun(), server)
server.add_insecure_port(PORT)
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
main()